Skip to content

Commit

Permalink
fix(pubsub): Connection Ack verification bug (#10200)
Browse files Browse the repository at this point in the history
* fex(pubsub): Add distinct RN Reachibility implementation

* fix(PubSub): Monitor tests

* fix(pubsub): Connection Ack verification bug

* Fix the test

* fix: Add tests to cover the fixed bug

* Update packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts

Co-authored-by: Francisco Rodriguez <elorzafe@amazon.com>

* Update packages/pubsub/__tests__/AWSAppSyncRealTimeProvider.test.ts

Co-authored-by: Francisco Rodriguez <elorzafe@amazon.com>

* Test fix

* Fix test

Co-authored-by: Francisco Rodriguez <elorzafe@amazon.com>
  • Loading branch information
stocaaro and elorzafe authored Aug 18, 2022
1 parent f28918b commit d6cb7f9
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 38 deletions.
80 changes: 59 additions & 21 deletions packages/pubsub/__tests__/AWSAppSyncRealTimeProvider.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -261,13 +261,11 @@ describe('AWSAppSyncRealTimeProvider', () => {

test('subscription fails when onerror triggered while waiting for onopen', async () => {
expect.assertions(1);

provider
.subscribe('test', {
appSyncGraphqlEndpoint: 'ws://localhost:8080',
})
.subscribe({ error: () => {} });

await fakeWebSocketInterface?.readyForUse;
await fakeWebSocketInterface?.triggerError();
expect(loggerSpy).toHaveBeenCalledWith(
Expand Down Expand Up @@ -303,16 +301,17 @@ describe('AWSAppSyncRealTimeProvider', () => {

test('subscription fails when onerror triggered while waiting for handshake', async () => {
expect.assertions(1);
await replaceConstant('CONNECTION_INIT_TIMEOUT', 20, async () => {
provider
.subscribe('test', {
appSyncGraphqlEndpoint: 'ws://localhost:8080',
})
.subscribe({ error: () => {} });

provider
.subscribe('test', {
appSyncGraphqlEndpoint: 'ws://localhost:8080',
})
.subscribe({ error: () => {} });

await fakeWebSocketInterface?.readyForUse;
await fakeWebSocketInterface?.triggerOpen();
await fakeWebSocketInterface?.triggerError();
await fakeWebSocketInterface?.readyForUse;
await fakeWebSocketInterface?.triggerOpen();
await fakeWebSocketInterface?.triggerError();
});
// When the socket throws an error during handshake
expect(loggerSpy).toHaveBeenCalledWith(
'DEBUG',
Expand All @@ -323,15 +322,17 @@ describe('AWSAppSyncRealTimeProvider', () => {
test('subscription fails when onclose triggered while waiting for handshake', async () => {
expect.assertions(1);

provider
.subscribe('test', {
appSyncGraphqlEndpoint: 'ws://localhost:8080',
})
.subscribe({ error: () => {} });
await replaceConstant('CONNECTION_INIT_TIMEOUT', 20, async () => {
provider
.subscribe('test', {
appSyncGraphqlEndpoint: 'ws://localhost:8080',
})
.subscribe({ error: () => {} });

await fakeWebSocketInterface?.readyForUse;
await fakeWebSocketInterface?.triggerOpen();
await fakeWebSocketInterface?.triggerClose();
await fakeWebSocketInterface?.readyForUse;
await fakeWebSocketInterface?.triggerOpen();
await fakeWebSocketInterface?.triggerClose();
});

// When the socket is closed during handshake
// Watching for raised exception to be caught and logged
Expand Down Expand Up @@ -662,7 +663,44 @@ describe('AWSAppSyncRealTimeProvider', () => {
});
});

test('connection init timeout', async () => {
test('connection init timeout met', async () => {
expect.assertions(2);
await replaceConstant('CONNECTION_INIT_TIMEOUT', 20, async () => {
const observer = provider.subscribe('test', {
appSyncGraphqlEndpoint: 'ws://localhost:8080',
});

const subscription = observer.subscribe({ error: () => {} });

await fakeWebSocketInterface?.readyForUse;
Promise.resolve();
await fakeWebSocketInterface?.triggerOpen();
Promise.resolve();
await fakeWebSocketInterface?.handShakeMessage();

// Wait no less than 20 ms
await delay(20);

// Wait until the socket is automatically disconnected
await expect(
fakeWebSocketInterface?.hubConnectionListener
?.currentConnectionState
).toBe(CS.Connecting);

// Watching for raised exception to be caught and logged
expect(loggerSpy).not.toBeCalledWith(
'DEBUG',
'error on bound ',
expect.objectContaining({
message: expect.stringMatching(
'Connection timeout: ack from AWSAppSyncRealTime was not received after'
),
})
);
});
});

test('connection init timeout missed', async () => {
expect.assertions(1);

await replaceConstant('CONNECTION_INIT_TIMEOUT', 20, async () => {
Expand Down Expand Up @@ -690,7 +728,7 @@ describe('AWSAppSyncRealTimeProvider', () => {
'error on bound ',
expect.objectContaining({
message: expect.stringMatching(
'Connection timeout: ack from AWSRealTime'
'Connection timeout: ack from AWSAppSyncRealTime was not received after'
),
})
);
Expand Down
5 changes: 1 addition & 4 deletions packages/pubsub/__tests__/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,7 @@ export class FakeWebSocketInterface {
await new Promise(async res => {
// The interface is closed when the socket "hasClosed"
this.hasClosed.then(() => res(undefined));
await this.waitUntilConnectionStateIn([
CS.Disconnected,
CS.ConnectionDisrupted,
]);
await this.waitUntilConnectionStateIn([CS.Disconnected]);
res(undefined);
});
}
Expand Down
26 changes: 13 additions & 13 deletions packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -747,20 +747,20 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider {
};
this.awsRealTimeSocket.send(JSON.stringify(gqlInit));

setTimeout(checkAckOk.bind(this, ackOk), CONNECTION_INIT_TIMEOUT);
}
const checkAckOk = (ackOk: boolean) => {
if (!ackOk) {
this.connectionStateMonitor.record(
CONNECTION_CHANGE.CONNECTION_FAILED
);
rej(
new Error(
`Connection timeout: ack from AWSAppSyncRealTime was not received after ${CONNECTION_INIT_TIMEOUT} ms`
)
);
}
};

function checkAckOk(ackOk: boolean) {
if (!ackOk) {
this.connectionStateMonitor.record(
CONNECTION_CHANGE.CONNECTION_FAILED
);
rej(
new Error(
`Connection timeout: ack from AWSRealTime was not received on ${CONNECTION_INIT_TIMEOUT} ms`
)
);
}
setTimeout(() => checkAckOk(ackOk), CONNECTION_INIT_TIMEOUT);
}
});
})();
Expand Down

0 comments on commit d6cb7f9

Please sign in to comment.