Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: rework error handler #9861

Merged
merged 12 commits into from
May 3, 2022
4 changes: 2 additions & 2 deletions packages/datastore/src/datastore/datastore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ function defaultConflictHandler(conflictData: SyncConflict): PersistentModel {
return modelInstanceCreator(modelConstructor, { ...localModel, _version });
}

function defaultErrorHandler(error: SyncError) {
function defaultErrorHandler(error: SyncError<PersistentModel>): void {
logger.warn(error);
}

Expand Down Expand Up @@ -686,7 +686,7 @@ class DataStore {
private amplifyConfig: Record<string, any> = {};
private authModeStrategy: AuthModeStrategy;
private conflictHandler: ConflictHandler;
private errorHandler: (error: SyncError) => void;
private errorHandler: (error: SyncError<PersistentModel>) => void;
private fullSyncInterval: number;
private initialized: Promise<void>;
private initReject: Function;
Expand Down
6 changes: 4 additions & 2 deletions packages/datastore/src/sync/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,15 @@ export class SyncEngine {
this.schema,
this.syncPredicates,
this.amplifyConfig,
this.authModeStrategy
this.authModeStrategy,
errorHandler
);
this.subscriptionsProcessor = new SubscriptionProcessor(
this.schema,
this.syncPredicates,
this.amplifyConfig,
this.authModeStrategy
this.authModeStrategy,
errorHandler
);
this.mutationsProcessor = new MutationProcessor(
this.schema,
Expand Down
18 changes: 12 additions & 6 deletions packages/datastore/src/sync/processors/mutation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
PersistentModelConstructor,
SchemaModel,
TypeConstructorMap,
ProcessName,
} from '../../types';
import { exhaustiveCheck, USER } from '../../util';
import { MutationEventOutbox } from '../outbox';
Expand All @@ -33,6 +34,8 @@ import {
getModelAuthModes,
TransformerMutationType,
getTokenForCustomAuth,
ErrorMap,
mapErrorToType,
} from '../utils';

const MAX_ATTEMPTS = 10;
Expand Down Expand Up @@ -372,21 +375,24 @@ class MutationProcessor {
throw new NonRetryableError('RetryMutation');
} else {
try {
const errorMap = {
BadRecord: error =>
/^Cannot return \w+ for [\w-_]+ type/.test(error.message),
} as ErrorMap;
await this.errorHandler({
dpilch marked this conversation as resolved.
Show resolved Hide resolved
localModel: this.modelInstanceCreator(
modelConstructor,
variables.input
),
localModel: variables.input,
message: error.message,
operation,
errorType: error.errorType,
errorType: mapErrorToType(errorMap, error),
errorInfo: error.errorInfo,
process: ProcessName.mutate,
cause: error,
remoteModel: error.data
? this.modelInstanceCreator(modelConstructor, error.data)
: null,
});
} catch (err) {
logger.warn('failed to execute errorHandler', err);
logger.warn('Mutation error handler failed with:', err);
} finally {
// Return empty tuple, dequeues the mutation
return error.data
Expand Down
45 changes: 41 additions & 4 deletions packages/datastore/src/sync/processors/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import {
PredicatesGroup,
ModelPredicate,
AuthModeStrategy,
ErrorHandler,
ProcessName,
} from '../../types';
import {
buildSubscriptionGraphQLOperation,
Expand All @@ -20,6 +22,8 @@ import {
getUserGroupsFromToken,
TransformerMutationType,
getTokenForCustomAuth,
mapErrorToType,
ErrorMap,
} from '../utils';
import { ModelPredicateCreator } from '../../predicates';
import { validatePredicate } from '../../util';
Expand Down Expand Up @@ -56,7 +60,8 @@ class SubscriptionProcessor {
private readonly schema: InternalSchema,
private readonly syncPredicates: WeakMap<SchemaModel, ModelPredicate<any>>,
private readonly amplifyConfig: Record<string, any> = {},
private readonly authModeStrategy: AuthModeStrategy
private readonly authModeStrategy: AuthModeStrategy,
private readonly errorHandler?: ErrorHandler
) {}

private buildSubscription(
Expand Down Expand Up @@ -436,7 +441,7 @@ class SubscriptionProcessor {
}
this.drainBuffer();
},
error: subscriptionError => {
error: async subscriptionError => {
const {
error: { errors: [{ message = '' } = {}] } = {
errors: [],
Expand Down Expand Up @@ -470,6 +475,37 @@ class SubscriptionProcessor {
}`
);
logger.warn('subscriptionError', message);
const errorMap = {
Unauthorized: (givenError: any) => {
const {
error: { errors: [{ message = '' } = {}] } = {
errors: [],
},
} = givenError;
const regex = /Connection failed.+Unauthorized/;
return regex.test(message);
},
} as ErrorMap;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For each file, might be easier to read down the line to move these error maps out of the block into the top of the file(s) and/or a utils.

try {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is partly what was I deliberating on earlier. I think we need to move this out near the top of the error callback. Let's log everything we can that comes back from appsync to start with here. We can cut back if we find we're being too verbose.

await this.errorHandler({
dpilch marked this conversation as resolved.
Show resolved Hide resolved
localModel: null,
message: message,
model: modelDefinition.name,
operation: operation,
errorType: mapErrorToType(
errorMap,
subscriptionError
),
process: ProcessName.subscribe,
remoteModel: null,
cause: subscriptionError,
});
} catch (e) {
logger.error(
'Sync error handler failed with:',
e
);
}
return;
} else {
logger.debug(
Expand All @@ -487,7 +523,7 @@ class SubscriptionProcessor {
return;
}
}

// Perhaps case for ErrorHandler
logger.warn('subscriptionError', message);

if (typeof subscriptionReadyCallback === 'function') {
Expand All @@ -498,9 +534,10 @@ class SubscriptionProcessor {
message.includes('"errorType":"Unauthorized"') ||
message.includes('"errorType":"OperationDisabled"')
) {
// Perhaps case for ErrorHandler
return;
}

// Perhaps case for ErrorHandler
observer.error(message);
},
})
Expand Down
34 changes: 30 additions & 4 deletions packages/datastore/src/sync/processors/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import {
PredicatesGroup,
GraphQLFilter,
AuthModeStrategy,
ErrorHandler,
ProcessName,
} from '../../types';
import {
buildGraphQLOperation,
Expand All @@ -16,6 +18,8 @@ import {
getForbiddenError,
predicateToGraphQLFilter,
getTokenForCustomAuth,
mapErrorToType,
ErrorMap,
} from '../utils';
import {
jitteredExponentialRetry,
Expand All @@ -24,7 +28,7 @@ import {
NonRetryableError,
} from '@aws-amplify/core';
import { ModelPredicateCreator } from '../../predicates';

import { ModelInstanceCreator } from '../../datastore/datastore';
const opResultDefaults = {
items: [],
nextToken: null,
Expand All @@ -40,7 +44,9 @@ class SyncProcessor {
private readonly schema: InternalSchema,
private readonly syncPredicates: WeakMap<SchemaModel, ModelPredicate<any>>,
private readonly amplifyConfig: Record<string, any> = {},
private readonly authModeStrategy: AuthModeStrategy
private readonly authModeStrategy: AuthModeStrategy,
private readonly errorHandler?: ErrorHandler,
private readonly modelInstanceCreator?: ModelInstanceCreator
) {
this.generateQueries();
}
Expand Down Expand Up @@ -223,15 +229,35 @@ class SyncProcessor {
error.data[opName] &&
error.data[opName].items
);

if (this.partialDataFeatureFlagEnabled()) {
if (hasItems) {
const result = error;
result.data[opName].items = result.data[opName].items.filter(
item => item !== null
);

if (error.errors) {
const errorMap = {
BadRecord: error =>
/^Cannot return \w+ for [\w-_]+ type/.test(error.message),
} as ErrorMap;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto. Probably move up/out.

await Promise.all(
error.error.map(async err => {
try {
await this.errorHandler({
dpilch marked this conversation as resolved.
Show resolved Hide resolved
localModel: null,
message: err.message,
model: modelDefinition.name,
operation: opName,
errorType: mapErrorToType(errorMap, err),
process: ProcessName.sync,
remoteModel: null,
cause: err,
});
} catch (e) {
logger.error('Sync error handler failed with:', e);
}
})
);
Hub.dispatch('datastore', {
event: 'syncQueriesPartialSyncError',
data: {
Expand Down
16 changes: 16 additions & 0 deletions packages/datastore/src/sync/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import {
ModelOperation,
InternalSchema,
AuthModeStrategy,
ErrorType,
} from '../types';
import { exhaustiveCheck } from '../util';
import { MutationEvent } from './';
Expand All @@ -40,6 +41,21 @@ enum GraphQLOperationType {
GET = 'query',
}

export type ErrorMap = {
[key in ErrorType]: (error: Error) => boolean;
};

export function mapErrorToType(errorMap: ErrorMap, error: Error): ErrorType {
dpilch marked this conversation as resolved.
Show resolved Hide resolved
const errorTypes = [...Object.keys(errorMap)] as ErrorType[];
for (const errorType of errorTypes) {
const matcher = errorMap[errorType];
if (matcher(error)) {
return errorType;
}
}
return 'Unknown';
}

export enum TransformerMutationType {
CREATE = 'Create',
UPDATE = 'Update',
Expand Down
32 changes: 24 additions & 8 deletions packages/datastore/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@ export type DataStoreConfig = {
DataStore?: {
authModeStrategyType?: AuthModeStrategyType;
conflictHandler?: ConflictHandler; // default : retry until client wins up to x times
errorHandler?: (error: SyncError) => void; // default : logger.warn
errorHandler?: (error: SyncError<PersistentModel>) => void; // default : logger.warn
maxRecordsToSync?: number; // merge
syncPageSize?: number;
fullSyncInterval?: number;
Expand All @@ -668,7 +668,7 @@ export type DataStoreConfig = {
};
authModeStrategyType?: AuthModeStrategyType;
conflictHandler?: ConflictHandler; // default : retry until client wins up to x times
errorHandler?: (error: SyncError) => void; // default : logger.warn
errorHandler?: (error: SyncError<PersistentModel>) => void; // default : logger.warn
maxRecordsToSync?: number; // merge
syncPageSize?: number;
fullSyncInterval?: number;
Expand Down Expand Up @@ -775,15 +775,31 @@ export type SyncConflict = {
attempts: number;
};

export type SyncError = {
export type SyncError<T extends PersistentModel> = {
message: string;
errorType: string;
errorInfo: string;
localModel: PersistentModel;
remoteModel: PersistentModel;
errorType: ErrorType;
errorInfo?: string;
dpilch marked this conversation as resolved.
Show resolved Hide resolved
model?: string;
localModel: T;
remoteModel: T;
process: ProcessName;
operation: string;
cause?: Error;
};

export type ErrorType =
| 'ConfigError'
| 'BadRecord'
| 'Unauthorized'
| 'Transient'
| 'Unknown';

export enum ProcessName {
'sync' = 'sync',
'mutate' = 'mutate',
'subscribe' = 'subscribe',
}

export const DISCARD = Symbol('DISCARD');

export type ConflictHandler = (
Expand All @@ -792,7 +808,7 @@ export type ConflictHandler = (
| Promise<PersistentModel | typeof DISCARD>
| PersistentModel
| typeof DISCARD;
export type ErrorHandler = (error: SyncError) => void;
export type ErrorHandler = (error: SyncError<PersistentModel>) => void;

export type DeferredCallbackResolverOptions = {
callback: () => void;
Expand Down