Skip to content

Commit

Permalink
chore: changed export from amplify_core to import from cloudwatch_log…
Browse files Browse the repository at this point in the history
…ger_plugin.dart directly. Also moved identifyCall method around the _startSyncingIfNotInProgress
  • Loading branch information
khatruong2009 committed Sep 22, 2023
1 parent a96d9f0 commit 00171b4
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 59 deletions.
3 changes: 0 additions & 3 deletions packages/amplify_core/lib/amplify_core.dart
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

// ignore_for_file: invalid_export_of_internal_element

library amplify_core;

import 'package:amplify_core/src/amplify_class.dart';
Expand Down Expand Up @@ -30,7 +28,6 @@ export 'src/config/notifications/notifications_config.dart';
export 'src/config/storage/storage_config.dart';

/// HTTP
export 'src/http/amplify_category_method.dart';
export 'src/http/amplify_http_client.dart';
export 'src/http/amplify_user_agent.dart';

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import 'dart:async';
import 'dart:math';

import 'package:amplify_core/amplify_core.dart';
// ignore: invalid_use_of_internal_member, implementation_imports
import 'package:amplify_core/src/http/amplify_category_method.dart';
import 'package:aws_logging_cloudwatch/aws_logging_cloudwatch.dart';
import 'package:aws_logging_cloudwatch/src/sdk/cloud_watch_logs.dart';
import 'package:aws_logging_cloudwatch/src/stoppable_timer.dart';
Expand Down Expand Up @@ -79,7 +81,10 @@ class CloudWatchLoggerPlugin extends AWSLoggerPlugin
_timer = pluginConfig.flushInterval > Duration.zero
? StoppableTimer(
duration: pluginConfig.flushInterval,
callback: _startSyncingIfNotInProgress,
callback: identifyCall(
LoggingCategoryMethod.flush,
() => _startSyncingIfNotInProgress,
),
onError: _onTimerError,
)
: null;
Expand Down Expand Up @@ -126,70 +131,68 @@ class CloudWatchLoggerPlugin extends AWSLoggerPlugin
}

Future<void> _startSyncingIfNotInProgress() async {
await identifyCall(LoggingCategoryMethod.batchSend, () async {
Future<void> startSyncing() async {
final batchStream = _getLogBatchesToSync();

await for (final (logs, events) in batchStream) {
_TooNewLogEventException? tooNewException;

while (logs.isNotEmpty && events.isNotEmpty) {
final rejectedLogEventsInfo =
(await _sendToCloudWatch(events)).rejectedLogEventsInfo;

if (rejectedLogEventsInfo == null) {
await _logStore.deleteItems(logs);
break;
}

final (tooOldEndIndex, tooNewStartIndex) =
rejectedLogEventsInfo.parse(events.length);

if (_isValidIndex(tooNewStartIndex, events.length)) {
tooNewException = _TooNewLogEventException(
events[tooNewStartIndex!].timestamp.toInt(),
);

logs.removeRange(tooNewStartIndex, events.length);
events.removeRange(tooNewStartIndex, events.length);
}

if (_isValidIndex(tooOldEndIndex, events.length)) {
await _logStore.deleteItems(
logs.sublist(0, tooOldEndIndex! + 1),
);

logs.removeRange(0, tooOldEndIndex + 1);
events.removeRange(0, tooOldEndIndex + 1);
}
Future<void> startSyncing() async {
final batchStream = _getLogBatchesToSync();

await for (final (logs, events) in batchStream) {
_TooNewLogEventException? tooNewException;

while (logs.isNotEmpty && events.isNotEmpty) {
final rejectedLogEventsInfo =
(await _sendToCloudWatch(events)).rejectedLogEventsInfo;

if (rejectedLogEventsInfo == null) {
await _logStore.deleteItems(logs);
break;
}

final (tooOldEndIndex, tooNewStartIndex) =
rejectedLogEventsInfo.parse(events.length);

if (_isValidIndex(tooNewStartIndex, events.length)) {
tooNewException = _TooNewLogEventException(
events[tooNewStartIndex!].timestamp.toInt(),
);

logs.removeRange(tooNewStartIndex, events.length);
events.removeRange(tooNewStartIndex, events.length);
}

if (tooNewException != null) {
throw tooNewException;
if (_isValidIndex(tooOldEndIndex, events.length)) {
await _logStore.deleteItems(
logs.sublist(0, tooOldEndIndex! + 1),
);

logs.removeRange(0, tooOldEndIndex + 1);
events.removeRange(0, tooOldEndIndex + 1);
}
}

if (tooNewException != null) {
throw tooNewException;
}
}
}

if (!_syncing) {
_syncing = true;
if (!_syncing) {
_syncing = true;

DateTime? nextRetry;
DateTime? nextRetry;

try {
await startSyncing();
} on _TooNewLogEventException catch (e) {
nextRetry = DateTime.fromMillisecondsSinceEpoch(
e.timeInMillisecondsSinceEpoch,
).add(_minusMaxLogEventTimeInFuture);
} on Exception catch (e) {
logger.error('Failed to sync logs to CloudWatch.', e);
} finally {
_handleFullLogStoreAfterSync(retryTime: nextRetry);
try {
await startSyncing();
} on _TooNewLogEventException catch (e) {
nextRetry = DateTime.fromMillisecondsSinceEpoch(
e.timeInMillisecondsSinceEpoch,
).add(_minusMaxLogEventTimeInFuture);
} on Exception catch (e) {
logger.error('Failed to sync logs to CloudWatch.', e);
} finally {
_handleFullLogStoreAfterSync(retryTime: nextRetry);

_syncing = false;
}
_syncing = false;
}
});
}
}

void _handleFullLogStoreAfterSync({
Expand Down Expand Up @@ -298,7 +301,11 @@ class CloudWatchLoggerPlugin extends AWSLoggerPlugin
);

if (isLogStoreFull && _shouldSyncOnFullLogStore()) {
await _startSyncingIfNotInProgress();
await identifyCall(
LoggingCategoryMethod.flush,
_startSyncingIfNotInProgress,
);
// await _startSyncingIfNotInProgress();
}
}

Expand Down

0 comments on commit 00171b4

Please sign in to comment.