From ddd57a8dbd0705934aca6900a4ecb8296db5c144 Mon Sep 17 00:00:00 2001 From: Nika Hassani Date: Tue, 5 Sep 2023 17:11:47 -0700 Subject: [PATCH 1/4] feat(logging): enable log rotation and set retry --- .../dart_queued_item_store.stub.dart | 2 +- .../dart_queued_item_store.vm.dart | 2 +- .../dart_queued_item_store.web.dart | 18 +- .../drift/drift_queued_item_store.dart | 2 +- .../index_db/indexed_db_adapter.dart | 7 +- .../test/queued_item_store_test.dart | 4 +- .../lib/src/cloudwatch_logger_plugin.dart | 123 +++++- .../queued_item_store/queued_item_store.dart | 2 +- .../test/cloudwatch_logger_plugin_test.dart | 371 +++++++++++++++++- .../queued_item_store_test.dart | 4 +- 10 files changed, 493 insertions(+), 42 deletions(-) diff --git a/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/dart_queued_item_store.stub.dart b/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/dart_queued_item_store.stub.dart index 74f5ac28b9..9e9fd28c3f 100644 --- a/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/dart_queued_item_store.stub.dart +++ b/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/dart_queued_item_store.stub.dart @@ -42,7 +42,7 @@ class DartQueuedItemStore implements QueuedItemStore, Closeable { } @override - FutureOr isFull(int maxSizeInMB) { + bool isFull(int maxSizeInMB) { throw UnimplementedError('isFull() has not been implemented.'); } diff --git a/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/dart_queued_item_store.vm.dart b/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/dart_queued_item_store.vm.dart index 1cd2e348e4..f4144954d4 100644 --- a/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/dart_queued_item_store.vm.dart +++ b/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/dart_queued_item_store.vm.dart @@ -53,7 +53,7 @@ class DartQueuedItemStore implements QueuedItemStore, Closeable { } @override - Future isFull(int maxSizeInMB) { + bool isFull(int maxSizeInMB) { return _database.isFull(maxSizeInMB); } diff --git a/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/dart_queued_item_store.web.dart b/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/dart_queued_item_store.web.dart index 0dc36a615c..c334bf2cdc 100644 --- a/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/dart_queued_item_store.web.dart +++ b/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/dart_queued_item_store.web.dart @@ -14,8 +14,8 @@ class DartQueuedItemStore // ignore: avoid_unused_constructor_parameters DartQueuedItemStore(String? storagePath); - late final Future _database = () async { - if (await IndexedDbAdapter.checkIsIndexedDBSupported()) { + late final QueuedItemStore _database = () { + if (IndexedDbAdapter.checkIsIndexedDBSupported()) { return IndexedDbAdapter(); } logger.warn( @@ -34,7 +34,7 @@ class DartQueuedItemStore String timestamp, { bool enableQueueRotation = false, }) async { - final db = await _database; + final db = _database; await db.addItem( string, timestamp, @@ -44,25 +44,25 @@ class DartQueuedItemStore @override Future deleteItems(Iterable items) async { - final db = await _database; + final db = _database; await db.deleteItems(items); } @override Future> getCount(int count) async { - final db = await _database; + final db = _database; return db.getCount(count); } @override Future> getAll() async { - final db = await _database; + final db = _database; return db.getAll(); } @override - Future isFull(int maxSizeInMB) async { - final db = await _database; + bool isFull(int maxSizeInMB) { + final db = _database; return db.isFull(maxSizeInMB); } @@ -70,7 +70,7 @@ class DartQueuedItemStore @override @visibleForTesting Future clear() async { - final db = await _database; + final db = _database; return db.clear(); } diff --git a/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/drift/drift_queued_item_store.dart b/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/drift/drift_queued_item_store.dart index b2f7406a0f..5508163dc6 100644 --- a/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/drift/drift_queued_item_store.dart +++ b/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/drift/drift_queued_item_store.dart @@ -104,7 +104,7 @@ class DriftQueuedItemStore extends _$DriftQueuedItemStore } @override - Future isFull(int maxSizeInMB) async { + bool isFull(int maxSizeInMB) { final maxBytes = maxSizeInMB * 1024 * 1024; return _currentTotalByteSize >= maxBytes; } diff --git a/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/index_db/indexed_db_adapter.dart b/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/index_db/indexed_db_adapter.dart index 37c1812e78..c4e4c13763 100644 --- a/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/index_db/indexed_db_adapter.dart +++ b/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/index_db/indexed_db_adapter.dart @@ -150,7 +150,7 @@ class IndexedDbAdapter implements QueuedItemStore { } @override - Future isFull(int maxSizeInMB) async { + bool isFull(int maxSizeInMB) { final maxBytes = maxSizeInMB * 1024 * 1024; return _currentTotalByteSize >= maxBytes; } @@ -167,15 +167,14 @@ class IndexedDbAdapter implements QueuedItemStore { void close() {} /// Check that IndexDB will work on this device. - static Future checkIsIndexedDBSupported() async { + static bool checkIsIndexedDBSupported() { if (indexedDB == null) { return false; } // indexedDB will be non-null in Firefox private browsing, // but will fail to open. try { - final openRequest = indexedDB!.open('test', 1); - await openRequest.future; + indexedDB!.open('test', 1).result; return true; } on Object { return false; diff --git a/packages/logging_cloudwatch/amplify_logging_cloudwatch/test/queued_item_store_test.dart b/packages/logging_cloudwatch/amplify_logging_cloudwatch/test/queued_item_store_test.dart index 4c18fc12d8..c49b105780 100644 --- a/packages/logging_cloudwatch/amplify_logging_cloudwatch/test/queued_item_store_test.dart +++ b/packages/logging_cloudwatch/amplify_logging_cloudwatch/test/queued_item_store_test.dart @@ -233,14 +233,14 @@ void main() { await db.addItem(largeItem, DateTime.now().toIso8601String()); } - var result = await db.isFull(capacityLimit); + var result = db.isFull(capacityLimit); expect(result, isFalse); for (var i = 0; i < 100; i++) { await db.addItem(largeItem, DateTime.now().toIso8601String()); } - result = await db.isFull(capacityLimit); + result = db.isFull(capacityLimit); expect(result, isTrue); }, ); diff --git a/packages/logging_cloudwatch/aws_logging_cloudwatch/lib/src/cloudwatch_logger_plugin.dart b/packages/logging_cloudwatch/aws_logging_cloudwatch/lib/src/cloudwatch_logger_plugin.dart index e1df21b800..c33e8649fe 100644 --- a/packages/logging_cloudwatch/aws_logging_cloudwatch/lib/src/cloudwatch_logger_plugin.dart +++ b/packages/logging_cloudwatch/aws_logging_cloudwatch/lib/src/cloudwatch_logger_plugin.dart @@ -32,9 +32,11 @@ import 'package:meta/meta.dart'; const int _maxNumberOfLogEventsInBatch = 10000; const int _maxLogEventsBatchSize = 1048576; const int _baseBufferSize = 26; +const int _maxLogEventsTimeSpanInBatch = Duration.millisecondsPerDay; const int _maxLogEventSize = 256000; -final int _maxLogEventsTimeSpanInBatch = - const Duration(hours: 24).inMilliseconds; +const int _maxLogEventTimeInFutureInMilliseconds = + Duration.millisecondsPerHour * 2; +const int _baseRetryIntervalInMilliseconds = Duration.millisecondsPerMinute; typedef _LogBatch = (List logQueues, List logEvents); @@ -123,7 +125,8 @@ class CloudWatchLoggerPlugin extends AWSLoggerPlugin bool _enabled; StoppableTimer? _timer; RemoteLoggingConstraintProvider? _remoteLoggingConstraintProvider; - + int _retryCount = 0; + int? _retryTimeInMillisecondsSinceEpoch; set remoteLoggingConstraintProvider( RemoteLoggingConstraintProvider remoteProvider, ) { @@ -139,32 +142,110 @@ class CloudWatchLoggerPlugin extends AWSLoggerPlugin Future startSyncing() async { final batchStream = _getLogBatchesToSync(); await for (final (logs, events) in batchStream) { - final response = await _sendToCloudWatch(events); - // TODO(nikahsn): handle tooOldLogEventEndIndex - // and expiredLogEventEndIndex. - if (response.rejectedLogEventsInfo?.tooNewLogEventStartIndex != null) { - // TODO(nikahsn): throw and exception to enable log rotation if the - // log store is full. + _TooNewLogEventException? tooNewException; + while (logs.isNotEmpty && events.isNotEmpty) { + final rejectedLogEventsInfo = + (await _sendToCloudWatch(events)).rejectedLogEventsInfo; + final expiredLogEventEndIndex = + rejectedLogEventsInfo?.expiredLogEventEndIndex; + final tooOldLogEventEndIndex = + rejectedLogEventsInfo?.tooOldLogEventEndIndex; + final tooNewLogEventStartIndex = + rejectedLogEventsInfo?.tooNewLogEventStartIndex; + + if (expiredLogEventEndIndex != null && + expiredLogEventEndIndex >= 0 && + expiredLogEventEndIndex <= events.length - 1) { + // delete old logs from log store + await _logStore + .deleteItems(logs.sublist(0, expiredLogEventEndIndex + 1)); + // set logs to start from next + logs.removeRange(0, expiredLogEventEndIndex + 1); + // set events to start from next + events.removeRange(0, expiredLogEventEndIndex + 1); + continue; + } + if (tooOldLogEventEndIndex != null && + tooOldLogEventEndIndex >= 0 && + tooOldLogEventEndIndex <= events.length - 1) { + // delete old logs from log store + await _logStore + .deleteItems(logs.sublist(0, tooOldLogEventEndIndex + 1)); + // set logs to start from next + logs.removeRange(0, tooOldLogEventEndIndex + 1); + // set events to start from next + events.removeRange(0, tooOldLogEventEndIndex + 1); + continue; + } + if (tooNewLogEventStartIndex != null && + tooNewLogEventStartIndex >= 0 && + tooNewLogEventStartIndex <= events.length - 1) { + tooNewException = _TooNewLogEventException( + events[tooNewLogEventStartIndex].timestamp.toInt(), + ); + // set logs to end at the index + logs.removeRange(tooNewLogEventStartIndex, logs.length); + // set events to end at the index + events.removeRange(tooNewLogEventStartIndex, events.length); + continue; + } + await _logStore.deleteItems(logs); break; } - await _logStore.deleteItems(logs); + if (tooNewException != null) { + throw tooNewException; + } } } if (!_syncing) { - // TODO(nikahsn): disable log rotation. _syncing = true; + var nextRetry = 0; try { await startSyncing(); + } on _TooNewLogEventException catch (e) { + nextRetry = e.timeInMillisecondsSinceEpoch - + _maxLogEventTimeInFutureInMilliseconds; } on Exception catch (e) { logger.error('Failed to sync logs to CloudWatch.', e); - // TODO(nikahsn): enable log rotation if the log store is full } finally { + _handleFullLogStoreAfterSync( + retryTimeInMillisecondsSinceEpoch: nextRetry, + ); _syncing = false; } } } + void _handleFullLogStoreAfterSync({ + int retryTimeInMillisecondsSinceEpoch = 0, + }) { + final isLogStoreFull = + _logStore.isFull(_pluginConfig.localStoreMaxSizeInMB); + if (!isLogStoreFull) { + _retryCount = 0; + _retryTimeInMillisecondsSinceEpoch = null; + return; + } + if (retryTimeInMillisecondsSinceEpoch > 0 && + retryTimeInMillisecondsSinceEpoch > + DateTime.now().millisecondsSinceEpoch) { + _retryTimeInMillisecondsSinceEpoch = retryTimeInMillisecondsSinceEpoch; + return; + } + _retryCount += 1; + _retryTimeInMillisecondsSinceEpoch = DateTime.now().millisecondsSinceEpoch + + (_retryCount * _baseRetryIntervalInMilliseconds); + } + + bool _shouldSyncOnFullLogStore() { + if (_retryTimeInMillisecondsSinceEpoch == null) { + return true; + } + return DateTime.now().millisecondsSinceEpoch >= + _retryTimeInMillisecondsSinceEpoch!; + } + void _onTimerError(Object e) { logger.error('Failed to sync logs to CloudWatch.', e); } @@ -235,11 +316,18 @@ class CloudWatchLoggerPlugin extends AWSLoggerPlugin return; } final item = logEntry.toQueuedItem(); + final isLogStoreFull = + _logStore.isFull(_pluginConfig.localStoreMaxSizeInMB); + final shouldEnableQueueRotation = + isLogStoreFull && _retryTimeInMillisecondsSinceEpoch != null; + await _logStore.addItem( item.value, item.timestamp, + enableQueueRotation: shouldEnableQueueRotation, ); - if (await _logStore.isFull(_pluginConfig.localStoreMaxSizeInMB)) { + + if (isLogStoreFull && _shouldSyncOnFullLogStore()) { await _startSyncingIfNotInProgress(); } } @@ -263,6 +351,8 @@ class CloudWatchLoggerPlugin extends AWSLoggerPlugin _enabled = false; _timer?.stop(); await _logStore.clear(); + _retryCount = 0; + _retryTimeInMillisecondsSinceEpoch = null; } /// Sends logs on-demand to CloudWatch. @@ -295,3 +385,10 @@ extension on LogEntry { ); } } + +class _TooNewLogEventException implements Exception { + const _TooNewLogEventException( + this.timeInMillisecondsSinceEpoch, + ); + final int timeInMillisecondsSinceEpoch; +} diff --git a/packages/logging_cloudwatch/aws_logging_cloudwatch/lib/src/queued_item_store/queued_item_store.dart b/packages/logging_cloudwatch/aws_logging_cloudwatch/lib/src/queued_item_store/queued_item_store.dart index 24e197d449..9317aa4878 100644 --- a/packages/logging_cloudwatch/aws_logging_cloudwatch/lib/src/queued_item_store/queued_item_store.dart +++ b/packages/logging_cloudwatch/aws_logging_cloudwatch/lib/src/queued_item_store/queued_item_store.dart @@ -24,7 +24,7 @@ abstract interface class QueuedItemStore { FutureOr> getAll(); /// Whether the queue size is reached [maxSizeInMB]. - FutureOr isFull(int maxSizeInMB); + bool isFull(int maxSizeInMB); /// Clear the queue of items. FutureOr clear(); diff --git a/packages/logging_cloudwatch/aws_logging_cloudwatch/test/cloudwatch_logger_plugin_test.dart b/packages/logging_cloudwatch/aws_logging_cloudwatch/test/cloudwatch_logger_plugin_test.dart index fe69936682..e1980dd168 100644 --- a/packages/logging_cloudwatch/aws_logging_cloudwatch/test/cloudwatch_logger_plugin_test.dart +++ b/packages/logging_cloudwatch/aws_logging_cloudwatch/test/cloudwatch_logger_plugin_test.dart @@ -62,17 +62,34 @@ void main() { logStreamProvider: mockCloudWatchLogStreamProvider, ); }); + test('when enabled, logs are added to the item store', () async { - when(() => mockQueuedItemStore.addItem(any(), any())) - .thenAnswer((_) async => {}); + when( + () => mockQueuedItemStore.addItem( + any(), + any(), + enableQueueRotation: false, + ), + ).thenAnswer((_) async => {}); + when(() => mockQueuedItemStore.isFull(pluginConfig.localStoreMaxSizeInMB)) - .thenAnswer((_) async => Future.value(false)); + .thenReturn(false); + plugin.enable(); + await expectLater( plugin.handleLogEntry(errorLog), completes, ); - verify(() => mockQueuedItemStore.addItem(any(), any())).called(1); + + verify( + () => mockQueuedItemStore.addItem( + any(), + any(), + enableQueueRotation: false, + ), + ).called(1); + verify( () => mockQueuedItemStore.isFull(pluginConfig.localStoreMaxSizeInMB), ).called(1); @@ -138,8 +155,8 @@ void main() { when(() => mockCloudWatchLogStreamProvider.defaultLogStream) .thenAnswer((_) async => Future.value('log stream name')); - when(() => mockQueuedItemStore.addItem(any(), any())) - .thenAnswer((_) async => {}); + when(() => mockQueuedItemStore.isFull(pluginConfig.localStoreMaxSizeInMB)) + .thenReturn(false); when(() => mockQueuedItemStore.deleteItems(any())) .thenAnswer((_) async => {}); @@ -184,8 +201,8 @@ void main() { when(() => mockCloudWatchLogStreamProvider.defaultLogStream) .thenAnswer((_) async => Future.value('log stream name')); - when(() => mockQueuedItemStore.addItem(any(), any())) - .thenAnswer((_) async => {}); + when(() => mockQueuedItemStore.isFull(pluginConfig.localStoreMaxSizeInMB)) + .thenReturn(false); when(() => mockQueuedItemStore.getAll()).thenAnswer( (_) async => Future>.value(queuedItems), @@ -239,6 +256,9 @@ void main() { (_) async => Future>.value(queuedItems), ); + when(() => mockQueuedItemStore.isFull(pluginConfig.localStoreMaxSizeInMB)) + .thenReturn(false); + await expectLater( plugin.flushLogs(), completes, @@ -276,6 +296,9 @@ void main() { (_) async => Future>.value(queuedItems), ); + when(() => mockQueuedItemStore.isFull(pluginConfig.localStoreMaxSizeInMB)) + .thenReturn(false); + await expectLater( plugin.flushLogs(), completes, @@ -289,5 +312,337 @@ void main() { () => mockCloudWatchLogStreamProvider.createLogStream(any()), ).called(1); }); + + test('it enables log rotation when log store is full and retry is set', + () async { + when( + () => mockPutLogEventsOperation.result, + ).thenAnswer( + (_) async => Future.value(PutLogEventsResponse()), + ); + + when(() => mockCloudWatchLogsClient.putLogEvents(any())).thenAnswer( + (_) => mockPutLogEventsOperation, + ); + + when(() => mockCloudWatchLogStreamProvider.defaultLogStream) + .thenAnswer((_) async => Future.value('log stream name')); + + when(() => mockQueuedItemStore.isFull(pluginConfig.localStoreMaxSizeInMB)) + .thenReturn(true); + + when(() => mockQueuedItemStore.deleteItems(any())) + .thenAnswer((_) async => {}); + + when(() => mockQueuedItemStore.getAll()).thenAnswer( + (_) async => Future>.value(queuedItems), + ); + plugin.enable(); + + await expectLater( + plugin.flushLogs(), + completes, + ); + + await expectLater( + plugin.handleLogEntry(errorLog), + completes, + ); + + verify( + () => mockQueuedItemStore.addItem( + any(), + any(), + enableQueueRotation: true, + ), + ).called(1); + }); + + test( + 'it does not enable log rotation when log store is full if retry is not ' + 'set. it start sync on full log store.', + () async { + final isFullResponse = [false, true, false]; + when( + () => mockQueuedItemStore.isFull(pluginConfig.localStoreMaxSizeInMB), + ).thenAnswer((_) => isFullResponse.removeAt(0)); + + when( + () => mockPutLogEventsOperation.result, + ).thenAnswer( + (_) async => + Future.value(PutLogEventsResponse()), + ); + + when(() => mockCloudWatchLogsClient.putLogEvents(any())).thenAnswer( + (_) => mockPutLogEventsOperation, + ); + + when(() => mockCloudWatchLogStreamProvider.defaultLogStream) + .thenAnswer((_) async => Future.value('log stream name')); + + when(() => mockQueuedItemStore.deleteItems(any())) + .thenAnswer((_) async => {}); + + when(() => mockQueuedItemStore.getAll()).thenAnswer( + (_) async => Future>.value(queuedItems), + ); + + plugin.enable(); + + await expectLater( + plugin.flushLogs(), + completes, + ); + + await expectLater( + plugin.handleLogEntry(errorLog), + completes, + ); + + verify( + () => mockCloudWatchLogsClient.putLogEvents(any()), + ).called(2); + + verify(() => mockQueuedItemStore.deleteItems(queuedItems)).called(2); + + verify( + () => mockQueuedItemStore.addItem( + any(), + any(), + enableQueueRotation: false, + ), + ).called(1); + }, + ); + + test('it does not sync on full log store if retry time not reached', + () async { + when( + () => mockPutLogEventsOperation.result, + ).thenAnswer( + (_) async => Future.value(PutLogEventsResponse()), + ); + + when(() => mockCloudWatchLogsClient.putLogEvents(any())).thenAnswer( + (_) => mockPutLogEventsOperation, + ); + + when(() => mockCloudWatchLogStreamProvider.defaultLogStream) + .thenAnswer((_) async => Future.value('log stream name')); + + when(() => mockQueuedItemStore.isFull(pluginConfig.localStoreMaxSizeInMB)) + .thenReturn(true); + + when(() => mockQueuedItemStore.deleteItems(any())) + .thenAnswer((_) async => {}); + + when(() => mockQueuedItemStore.getAll()).thenAnswer( + (_) async => Future>.value(queuedItems), + ); + plugin.enable(); + + await expectLater( + plugin.flushLogs(), + completes, + ); + + await expectLater( + plugin.handleLogEntry(errorLog), + completes, + ); + + verify( + () => mockCloudWatchLogsClient.putLogEvents(any()), + ).called(1); + + verify(() => mockQueuedItemStore.deleteItems(queuedItems)).called(1); + }); + + test( + 'it deletes too old logs in the batch and sync the rest', + () async { + final responses = [ + PutLogEventsResponse( + rejectedLogEventsInfo: + RejectedLogEventsInfo(tooOldLogEventEndIndex: 0), + ), + PutLogEventsResponse(), + ]; + + when( + () => mockPutLogEventsOperation.result, + ).thenAnswer( + (_) async { + final response = responses.removeAt(0); + return Future.value(response); + }, + ); + + when(() => mockCloudWatchLogsClient.putLogEvents(any())).thenAnswer( + (_) => mockPutLogEventsOperation, + ); + + when(() => mockCloudWatchLogStreamProvider.defaultLogStream) + .thenAnswer((_) async => Future.value('log stream name')); + + when( + () => mockQueuedItemStore.isFull(pluginConfig.localStoreMaxSizeInMB), + ).thenReturn(false); + + when(() => mockQueuedItemStore.deleteItems(any())) + .thenAnswer((_) async => {}); + + when(() => mockQueuedItemStore.getAll()).thenAnswer( + (_) async => Future>.value(queuedItems), + ); + plugin.enable(); + + await expectLater( + plugin.flushLogs(), + completes, + ); + + verify( + () => mockCloudWatchLogsClient.putLogEvents(any()), + ).called(2); + + final captures = verify( + () => mockQueuedItemStore + .deleteItems(captureAny>()), + ); + + expect(captures.callCount, 2); + expect( + (captures.captured.first as Iterable).first, + queuedItems.first, + ); + expect( + (captures.captured.last as Iterable).first, + queuedItems.last, + ); + }, + ); + + test('it deletes expired logs in the batch and sync the rest', () async { + final responses = [ + PutLogEventsResponse( + rejectedLogEventsInfo: + RejectedLogEventsInfo(expiredLogEventEndIndex: 0), + ), + PutLogEventsResponse(), + ]; + + when( + () => mockPutLogEventsOperation.result, + ).thenAnswer( + (_) async { + final response = responses.removeAt(0); + return Future.value(response); + }, + ); + + when(() => mockCloudWatchLogsClient.putLogEvents(any())).thenAnswer( + (_) => mockPutLogEventsOperation, + ); + + when(() => mockCloudWatchLogStreamProvider.defaultLogStream) + .thenAnswer((_) async => Future.value('log stream name')); + + when( + () => mockQueuedItemStore.isFull(pluginConfig.localStoreMaxSizeInMB), + ).thenReturn(false); + + when(() => mockQueuedItemStore.deleteItems(any())) + .thenAnswer((_) async => {}); + + when(() => mockQueuedItemStore.getAll()).thenAnswer( + (_) async => Future>.value(queuedItems), + ); + plugin.enable(); + + await expectLater( + plugin.flushLogs(), + completes, + ); + + verify( + () => mockCloudWatchLogsClient.putLogEvents(any()), + ).called(2); + + final captures = verify( + () => + mockQueuedItemStore.deleteItems(captureAny>()), + ); + + expect(captures.callCount, 2); + expect( + (captures.captured.first as Iterable).first, + queuedItems.first, + ); + expect( + (captures.captured.last as Iterable).first, + queuedItems.last, + ); + }); + + test('it leaves too new logs in the batch and sync the rest', () async { + final responses = [ + PutLogEventsResponse( + rejectedLogEventsInfo: + RejectedLogEventsInfo(tooNewLogEventStartIndex: 1), + ), + PutLogEventsResponse(), + ]; + + when( + () => mockPutLogEventsOperation.result, + ).thenAnswer( + (_) async { + final response = responses.removeAt(0); + return Future.value(response); + }, + ); + + when(() => mockCloudWatchLogsClient.putLogEvents(any())).thenAnswer( + (_) => mockPutLogEventsOperation, + ); + + when(() => mockCloudWatchLogStreamProvider.defaultLogStream) + .thenAnswer((_) async => Future.value('log stream name')); + + when( + () => mockQueuedItemStore.isFull(pluginConfig.localStoreMaxSizeInMB), + ).thenReturn(false); + + when(() => mockQueuedItemStore.deleteItems(any())) + .thenAnswer((_) async => {}); + + when(() => mockQueuedItemStore.getAll()).thenAnswer( + (_) async => Future>.value(queuedItems), + ); + plugin.enable(); + + await expectLater( + plugin.flushLogs(), + completes, + ); + + verify( + () => mockCloudWatchLogsClient.putLogEvents(any()), + ).called(2); + + final captures = verify( + () => + mockQueuedItemStore.deleteItems(captureAny>()), + ); + + expect(captures.callCount, 1); + expect(captures.captured.length, 1); + expect( + (captures.captured.last as Iterable).first, + queuedItems.first, + ); + }); }); } diff --git a/packages/logging_cloudwatch/aws_logging_cloudwatch/test/queued_item_store/queued_item_store_test.dart b/packages/logging_cloudwatch/aws_logging_cloudwatch/test/queued_item_store/queued_item_store_test.dart index ec2bca8874..ab035a6ca2 100644 --- a/packages/logging_cloudwatch/aws_logging_cloudwatch/test/queued_item_store/queued_item_store_test.dart +++ b/packages/logging_cloudwatch/aws_logging_cloudwatch/test/queued_item_store/queued_item_store_test.dart @@ -231,7 +231,7 @@ void main() { await db.addItem('0', DateTime.now().toIso8601String()); } - var result = await db.isFull(capacityLimit); + var result = db.isFull(capacityLimit); expect(result, isFalse); // add enough items to exceed capacity limit of 1mb @@ -239,7 +239,7 @@ void main() { await db.addItem('0', DateTime.now().toIso8601String()); } - result = await db.isFull(capacityLimit); + result = db.isFull(capacityLimit); expect(result, isTrue); }, ); From e6fd967d4f7ed6294a0869e512ce34f6374597cc Mon Sep 17 00:00:00 2001 From: Nika Hassani Date: Fri, 8 Sep 2023 15:18:55 -0700 Subject: [PATCH 2/4] address review comments --- .../dart_queued_item_store.web.dart | 18 ++-- .../lib/src/cloudwatch_logger_plugin.dart | 92 ++++++++++--------- .../test/cloudwatch_logger_plugin_test.dart | 88 +++++++++++++++++- 3 files changed, 139 insertions(+), 59 deletions(-) diff --git a/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/dart_queued_item_store.web.dart b/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/dart_queued_item_store.web.dart index c334bf2cdc..c20db8c60b 100644 --- a/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/dart_queued_item_store.web.dart +++ b/packages/logging_cloudwatch/amplify_logging_cloudwatch/lib/src/queued_item_store/dart_queued_item_store.web.dart @@ -34,8 +34,7 @@ class DartQueuedItemStore String timestamp, { bool enableQueueRotation = false, }) async { - final db = _database; - await db.addItem( + await _database.addItem( string, timestamp, enableQueueRotation: enableQueueRotation, @@ -44,34 +43,29 @@ class DartQueuedItemStore @override Future deleteItems(Iterable items) async { - final db = _database; - await db.deleteItems(items); + await _database.deleteItems(items); } @override Future> getCount(int count) async { - final db = _database; - return db.getCount(count); + return _database.getCount(count); } @override Future> getAll() async { - final db = _database; - return db.getAll(); + return _database.getAll(); } @override bool isFull(int maxSizeInMB) { - final db = _database; - return db.isFull(maxSizeInMB); + return _database.isFull(maxSizeInMB); } /// Clear IndexedDB data. @override @visibleForTesting Future clear() async { - final db = _database; - return db.clear(); + return _database.clear(); } @override diff --git a/packages/logging_cloudwatch/aws_logging_cloudwatch/lib/src/cloudwatch_logger_plugin.dart b/packages/logging_cloudwatch/aws_logging_cloudwatch/lib/src/cloudwatch_logger_plugin.dart index c33e8649fe..210a676ddf 100644 --- a/packages/logging_cloudwatch/aws_logging_cloudwatch/lib/src/cloudwatch_logger_plugin.dart +++ b/packages/logging_cloudwatch/aws_logging_cloudwatch/lib/src/cloudwatch_logger_plugin.dart @@ -146,52 +146,36 @@ class CloudWatchLoggerPlugin extends AWSLoggerPlugin while (logs.isNotEmpty && events.isNotEmpty) { final rejectedLogEventsInfo = (await _sendToCloudWatch(events)).rejectedLogEventsInfo; - final expiredLogEventEndIndex = - rejectedLogEventsInfo?.expiredLogEventEndIndex; - final tooOldLogEventEndIndex = - rejectedLogEventsInfo?.tooOldLogEventEndIndex; - final tooNewLogEventStartIndex = - rejectedLogEventsInfo?.tooNewLogEventStartIndex; - - if (expiredLogEventEndIndex != null && - expiredLogEventEndIndex >= 0 && - expiredLogEventEndIndex <= events.length - 1) { - // delete old logs from log store - await _logStore - .deleteItems(logs.sublist(0, expiredLogEventEndIndex + 1)); - // set logs to start from next - logs.removeRange(0, expiredLogEventEndIndex + 1); - // set events to start from next - events.removeRange(0, expiredLogEventEndIndex + 1); - continue; - } - if (tooOldLogEventEndIndex != null && - tooOldLogEventEndIndex >= 0 && - tooOldLogEventEndIndex <= events.length - 1) { - // delete old logs from log store - await _logStore - .deleteItems(logs.sublist(0, tooOldLogEventEndIndex + 1)); - // set logs to start from next - logs.removeRange(0, tooOldLogEventEndIndex + 1); - // set events to start from next - events.removeRange(0, tooOldLogEventEndIndex + 1); - continue; + if (rejectedLogEventsInfo == null) { + await _logStore.deleteItems(logs); + break; } - if (tooNewLogEventStartIndex != null && - tooNewLogEventStartIndex >= 0 && - tooNewLogEventStartIndex <= events.length - 1) { - tooNewException = _TooNewLogEventException( - events[tooNewLogEventStartIndex].timestamp.toInt(), - ); - // set logs to end at the index - logs.removeRange(tooNewLogEventStartIndex, logs.length); - // set events to end at the index - events.removeRange(tooNewLogEventStartIndex, events.length); + { + final (tooOldEndIndex, tooNewStartIndex) = + rejectedLogEventsInfo.parse(events.length); + + if (_isValidIndex(tooNewStartIndex, events.length)) { + tooNewException = _TooNewLogEventException( + events[tooNewStartIndex!].timestamp.toInt(), + ); + // set logs to end before the index. + logs.removeRange(tooNewStartIndex, events.length); + // set events to end before the index. + events.removeRange(tooNewStartIndex, events.length); + } + if (_isValidIndex(tooOldEndIndex, events.length)) { + // remove old logs from log store. + await _logStore.deleteItems(logs.sublist(0, tooOldEndIndex! + 1)); + // set logs to start after the index. + logs.removeRange(0, tooOldEndIndex + 1); + // set events to start after the index. + events.removeRange(0, tooOldEndIndex + 1); + } continue; } - await _logStore.deleteItems(logs); - break; } + // after sending each batch to CloudWatch check if the batch has + // `tooNewException` and throw to stop syncing next batches. if (tooNewException != null) { throw tooNewException; } @@ -386,9 +370,33 @@ extension on LogEntry { } } +extension on RejectedLogEventsInfo { + (int? pastEndIndex, int? futureStartIndex) parse(int length) { + int? pastEndIndex; + int? futureStartIndex; + + if (_isValidIndex(tooOldLogEventEndIndex, length)) { + pastEndIndex = tooOldLogEventEndIndex; + } + if (_isValidIndex(expiredLogEventEndIndex, length)) { + pastEndIndex = pastEndIndex == null + ? expiredLogEventEndIndex + : max(pastEndIndex, expiredLogEventEndIndex!); + } + if (_isValidIndex(tooNewLogEventStartIndex, length)) { + futureStartIndex = tooNewLogEventStartIndex; + } + return (pastEndIndex, futureStartIndex); + } +} + class _TooNewLogEventException implements Exception { const _TooNewLogEventException( this.timeInMillisecondsSinceEpoch, ); final int timeInMillisecondsSinceEpoch; } + +bool _isValidIndex(int? index, int length) { + return index != null && index >= 0 && index <= length - 1; +} diff --git a/packages/logging_cloudwatch/aws_logging_cloudwatch/test/cloudwatch_logger_plugin_test.dart b/packages/logging_cloudwatch/aws_logging_cloudwatch/test/cloudwatch_logger_plugin_test.dart index e1980dd168..4e4f57f39e 100644 --- a/packages/logging_cloudwatch/aws_logging_cloudwatch/test/cloudwatch_logger_plugin_test.dart +++ b/packages/logging_cloudwatch/aws_logging_cloudwatch/test/cloudwatch_logger_plugin_test.dart @@ -45,7 +45,17 @@ void main() { ), QueuedItem( id: 2, - value: 'second og message', + value: 'second log message', + timestamp: DateTime.timestamp().toIso8601String(), + ), + QueuedItem( + id: 3, + value: 'third log message', + timestamp: DateTime.timestamp().toIso8601String(), + ), + QueuedItem( + id: 4, + value: 'forth log message', timestamp: DateTime.timestamp().toIso8601String(), ), ]; @@ -518,8 +528,8 @@ void main() { queuedItems.first, ); expect( - (captures.captured.last as Iterable).first, - queuedItems.last, + (captures.captured.last as Iterable), + queuedItems.sublist(1), ); }, ); @@ -581,8 +591,8 @@ void main() { queuedItems.first, ); expect( - (captures.captured.last as Iterable).first, - queuedItems.last, + (captures.captured.last as Iterable), + queuedItems.sublist(1), ); }); @@ -644,5 +654,73 @@ void main() { queuedItems.first, ); }); + + test( + 'it deltes old logs and leaves too new logs in the batch' + ' and sync the rest', () async { + final responses = [ + PutLogEventsResponse( + rejectedLogEventsInfo: RejectedLogEventsInfo( + expiredLogEventEndIndex: 0, + tooOldLogEventEndIndex: 1, + tooNewLogEventStartIndex: 3, + ), + ), + PutLogEventsResponse(), + ]; + + when( + () => mockPutLogEventsOperation.result, + ).thenAnswer( + (_) async { + final response = responses.removeAt(0); + return Future.value(response); + }, + ); + + when(() => mockCloudWatchLogsClient.putLogEvents(any())).thenAnswer( + (_) => mockPutLogEventsOperation, + ); + + when(() => mockCloudWatchLogStreamProvider.defaultLogStream) + .thenAnswer((_) async => Future.value('log stream name')); + + when( + () => mockQueuedItemStore.isFull(pluginConfig.localStoreMaxSizeInMB), + ).thenReturn(false); + + when(() => mockQueuedItemStore.deleteItems(any())) + .thenAnswer((_) async => {}); + + when(() => mockQueuedItemStore.getAll()).thenAnswer( + (_) async => Future>.value(queuedItems), + ); + plugin.enable(); + + await expectLater( + plugin.flushLogs(), + completes, + ); + + verify( + () => mockCloudWatchLogsClient.putLogEvents(any()), + ).called(2); + + final captures = verify( + () => + mockQueuedItemStore.deleteItems(captureAny>()), + ); + + expect(captures.callCount, 2); + expect( + (captures.captured.first as Iterable), + queuedItems.sublist(0, 2), + ); + + expect( + (captures.captured.last as Iterable), + queuedItems.sublist(2, 3), + ); + }); }); } From c4eb744c40e20d23c0f8b0d6e761aad5d46cd690 Mon Sep 17 00:00:00 2001 From: Nika Hassani Date: Fri, 8 Sep 2023 16:17:58 -0700 Subject: [PATCH 3/4] use datetime and duration instead of int miliseconds since epoch --- .../lib/src/cloudwatch_logger_plugin.dart | 39 ++++++++----------- 1 file changed, 17 insertions(+), 22 deletions(-) diff --git a/packages/logging_cloudwatch/aws_logging_cloudwatch/lib/src/cloudwatch_logger_plugin.dart b/packages/logging_cloudwatch/aws_logging_cloudwatch/lib/src/cloudwatch_logger_plugin.dart index 210a676ddf..dff2539f6b 100644 --- a/packages/logging_cloudwatch/aws_logging_cloudwatch/lib/src/cloudwatch_logger_plugin.dart +++ b/packages/logging_cloudwatch/aws_logging_cloudwatch/lib/src/cloudwatch_logger_plugin.dart @@ -34,9 +34,8 @@ const int _maxLogEventsBatchSize = 1048576; const int _baseBufferSize = 26; const int _maxLogEventsTimeSpanInBatch = Duration.millisecondsPerDay; const int _maxLogEventSize = 256000; -const int _maxLogEventTimeInFutureInMilliseconds = - Duration.millisecondsPerHour * 2; -const int _baseRetryIntervalInMilliseconds = Duration.millisecondsPerMinute; +const Duration _minusMaxLogEventTimeInFuture = Duration(hours: -2); +const Duration _baseRetryInterval = Duration(minutes: 1); typedef _LogBatch = (List logQueues, List logEvents); @@ -126,7 +125,7 @@ class CloudWatchLoggerPlugin extends AWSLoggerPlugin StoppableTimer? _timer; RemoteLoggingConstraintProvider? _remoteLoggingConstraintProvider; int _retryCount = 0; - int? _retryTimeInMillisecondsSinceEpoch; + DateTime? _retryTime; set remoteLoggingConstraintProvider( RemoteLoggingConstraintProvider remoteProvider, ) { @@ -184,17 +183,18 @@ class CloudWatchLoggerPlugin extends AWSLoggerPlugin if (!_syncing) { _syncing = true; - var nextRetry = 0; + DateTime? nextRetry; try { await startSyncing(); } on _TooNewLogEventException catch (e) { - nextRetry = e.timeInMillisecondsSinceEpoch - - _maxLogEventTimeInFutureInMilliseconds; + nextRetry = + DateTime.fromMillisecondsSinceEpoch(e.timeInMillisecondsSinceEpoch) + .add(_minusMaxLogEventTimeInFuture); } on Exception catch (e) { logger.error('Failed to sync logs to CloudWatch.', e); } finally { _handleFullLogStoreAfterSync( - retryTimeInMillisecondsSinceEpoch: nextRetry, + retryTime: nextRetry, ); _syncing = false; } @@ -202,32 +202,28 @@ class CloudWatchLoggerPlugin extends AWSLoggerPlugin } void _handleFullLogStoreAfterSync({ - int retryTimeInMillisecondsSinceEpoch = 0, + DateTime? retryTime, }) { final isLogStoreFull = _logStore.isFull(_pluginConfig.localStoreMaxSizeInMB); if (!isLogStoreFull) { _retryCount = 0; - _retryTimeInMillisecondsSinceEpoch = null; + _retryTime = null; return; } - if (retryTimeInMillisecondsSinceEpoch > 0 && - retryTimeInMillisecondsSinceEpoch > - DateTime.now().millisecondsSinceEpoch) { - _retryTimeInMillisecondsSinceEpoch = retryTimeInMillisecondsSinceEpoch; + if (retryTime != null && retryTime.isAfter(DateTime.timestamp())) { + _retryTime = retryTime; return; } _retryCount += 1; - _retryTimeInMillisecondsSinceEpoch = DateTime.now().millisecondsSinceEpoch + - (_retryCount * _baseRetryIntervalInMilliseconds); + _retryTime = DateTime.timestamp().add((_baseRetryInterval * _retryCount)); } bool _shouldSyncOnFullLogStore() { - if (_retryTimeInMillisecondsSinceEpoch == null) { + if (_retryTime == null) { return true; } - return DateTime.now().millisecondsSinceEpoch >= - _retryTimeInMillisecondsSinceEpoch!; + return !(_retryTime!.isAfter(DateTime.timestamp())); } void _onTimerError(Object e) { @@ -302,8 +298,7 @@ class CloudWatchLoggerPlugin extends AWSLoggerPlugin final item = logEntry.toQueuedItem(); final isLogStoreFull = _logStore.isFull(_pluginConfig.localStoreMaxSizeInMB); - final shouldEnableQueueRotation = - isLogStoreFull && _retryTimeInMillisecondsSinceEpoch != null; + final shouldEnableQueueRotation = isLogStoreFull && _retryTime != null; await _logStore.addItem( item.value, @@ -336,7 +331,7 @@ class CloudWatchLoggerPlugin extends AWSLoggerPlugin _timer?.stop(); await _logStore.clear(); _retryCount = 0; - _retryTimeInMillisecondsSinceEpoch = null; + _retryTime = null; } /// Sends logs on-demand to CloudWatch. From 8e2aa134143bb07d2067e47ae28a2b4405216d94 Mon Sep 17 00:00:00 2001 From: Nika Hassani Date: Mon, 11 Sep 2023 16:30:02 -0700 Subject: [PATCH 4/4] address review comments --- .../lib/src/cloudwatch_logger_plugin.dart | 44 +++++++++---------- 1 file changed, 21 insertions(+), 23 deletions(-) diff --git a/packages/logging_cloudwatch/aws_logging_cloudwatch/lib/src/cloudwatch_logger_plugin.dart b/packages/logging_cloudwatch/aws_logging_cloudwatch/lib/src/cloudwatch_logger_plugin.dart index dff2539f6b..0f8591c8f3 100644 --- a/packages/logging_cloudwatch/aws_logging_cloudwatch/lib/src/cloudwatch_logger_plugin.dart +++ b/packages/logging_cloudwatch/aws_logging_cloudwatch/lib/src/cloudwatch_logger_plugin.dart @@ -35,7 +35,7 @@ const int _baseBufferSize = 26; const int _maxLogEventsTimeSpanInBatch = Duration.millisecondsPerDay; const int _maxLogEventSize = 256000; const Duration _minusMaxLogEventTimeInFuture = Duration(hours: -2); -const Duration _baseRetryInterval = Duration(minutes: 1); +const Duration _baseRetryInterval = Duration(seconds: 10); typedef _LogBatch = (List logQueues, List logEvents); @@ -149,28 +149,26 @@ class CloudWatchLoggerPlugin extends AWSLoggerPlugin await _logStore.deleteItems(logs); break; } - { - final (tooOldEndIndex, tooNewStartIndex) = - rejectedLogEventsInfo.parse(events.length); - - if (_isValidIndex(tooNewStartIndex, events.length)) { - tooNewException = _TooNewLogEventException( - events[tooNewStartIndex!].timestamp.toInt(), - ); - // set logs to end before the index. - logs.removeRange(tooNewStartIndex, events.length); - // set events to end before the index. - events.removeRange(tooNewStartIndex, events.length); - } - if (_isValidIndex(tooOldEndIndex, events.length)) { - // remove old logs from log store. - await _logStore.deleteItems(logs.sublist(0, tooOldEndIndex! + 1)); - // set logs to start after the index. - logs.removeRange(0, tooOldEndIndex + 1); - // set events to start after the index. - events.removeRange(0, tooOldEndIndex + 1); - } - continue; + + final (tooOldEndIndex, tooNewStartIndex) = + rejectedLogEventsInfo.parse(events.length); + + if (_isValidIndex(tooNewStartIndex, events.length)) { + tooNewException = _TooNewLogEventException( + events[tooNewStartIndex!].timestamp.toInt(), + ); + // set logs to end before the index. + logs.removeRange(tooNewStartIndex, events.length); + // set events to end before the index. + events.removeRange(tooNewStartIndex, events.length); + } + if (_isValidIndex(tooOldEndIndex, events.length)) { + // remove old logs from log store. + await _logStore.deleteItems(logs.sublist(0, tooOldEndIndex! + 1)); + // set logs to start after the index. + logs.removeRange(0, tooOldEndIndex + 1); + // set events to start after the index. + events.removeRange(0, tooOldEndIndex + 1); } } // after sending each batch to CloudWatch check if the batch has