Skip to content

Commit

Permalink
Fix for _PushStatus Stuck 'running' when Count is Off (#4319)
Browse files Browse the repository at this point in the history
* Fix for _PushStatus stuck 'running' if count is off

* use 'count' for batches

* push worker test fix
  • Loading branch information
montymxb authored and flovilmart committed Nov 5, 2017
1 parent 842343a commit c1a7347
Show file tree
Hide file tree
Showing 5 changed files with 276 additions and 9 deletions.
257 changes: 257 additions & 0 deletions spec/Parse.Push.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -203,4 +203,261 @@ describe('Parse.Push', () => {
done();
});
});

const successfulAny = function(body, installations) {
const promises = installations.map((device) => {
return Promise.resolve({
transmitted: true,
device: device,
})
});

return Promise.all(promises);
};

const provideInstallations = function(num) {
if(!num) {
num = 2;
}

const installations = [];
while(installations.length !== num) {
// add Android installations
const installation = new Parse.Object("_Installation");
installation.set("installationId", "installation_" + installations.length);
installation.set("deviceToken","device_token_" + installations.length);
installation.set("deviceType", "android");
installations.push(installation);
}

return installations;
};

const losingAdapter = {
send: function(body, installations) {
// simulate having lost an installation before this was called
// thus invalidating our 'count' in _PushStatus
installations.pop();

return successfulAny(body, installations);
},
getValidPushTypes: function() {
return ["android"];
}
};

/**
* Verifies that _PushStatus cannot get stuck in a 'running' state
* Simulates a simple push where 1 installation is removed between _PushStatus
* count being set and the pushes being sent
*/
it('does not get stuck with _PushStatus \'running\' on 1 installation lost', (done) => {
reconfigureServer({
push: {adapter: losingAdapter}
}).then(() => {
return Parse.Object.saveAll(provideInstallations());
}).then(() => {
return Parse.Push.send(
{
data: {alert: "We fixed our status!"},
where: {deviceType: 'android'}
},
{ useMasterKey: true }
);
}).then(() => {
// it is enqueued so it can take time
return new Promise((resolve) => {
setTimeout(() => {
resolve();
}, 1000);
});
}).then(() => {
// query for push status
const query = new Parse.Query('_PushStatus');
return query.find({useMasterKey: true});
}).then((results) => {
// verify status is NOT broken
expect(results.length).toBe(1);
const result = results[0];
expect(result.get('status')).toEqual('succeeded');
expect(result.get('numSent')).toEqual(1);
expect(result.get('count')).toEqual(undefined);
done();
});
});

/**
* Verifies that _PushStatus cannot get stuck in a 'running' state
* Simulates a simple push where 1 installation is added between _PushStatus
* count being set and the pushes being sent
*/
it('does not get stuck with _PushStatus \'running\' on 1 installation added', (done) => {
const installations = provideInstallations();

// add 1 iOS installation which we will omit & add later on
const iOSInstallation = new Parse.Object("_Installation");
iOSInstallation.set("installationId", "installation_" + installations.length);
iOSInstallation.set("deviceToken","device_token_" + installations.length);
iOSInstallation.set("deviceType", "ios");
installations.push(iOSInstallation);

reconfigureServer({
push: {
adapter: {
send: function(body, installations) {
// simulate having added an installation before this was called
// thus invalidating our 'count' in _PushStatus
installations.push(iOSInstallation);

return successfulAny(body, installations);
},
getValidPushTypes: function() {
return ["android"];
}
}
}
}).then(() => {
return Parse.Object.saveAll(installations);
}).then(() => {
return Parse.Push.send(
{
data: {alert: "We fixed our status!"},
where: {deviceType: {'$ne' : 'random'}}
},
{ useMasterKey: true }
);
}).then(() => {
// it is enqueued so it can take time
return new Promise((resolve) => {
setTimeout(() => {
resolve();
}, 1000);
});
}).then(() => {
// query for push status
const query = new Parse.Query('_PushStatus');
return query.find({useMasterKey: true});
}).then((results) => {
// verify status is NOT broken
expect(results.length).toBe(1);
const result = results[0];
expect(result.get('status')).toEqual('succeeded');
expect(result.get('numSent')).toEqual(3);
expect(result.get('count')).toEqual(undefined);
done();
});
});

/**
* Verifies that _PushStatus cannot get stuck in a 'running' state
* Simulates an extended push, where some installations may be removed,
* resulting in a non-zero count
*/
it('does not get stuck with _PushStatus \'running\' on many installations removed', (done) => {
const devices = 1000;
const installations = provideInstallations(devices);

reconfigureServer({
push: {adapter: losingAdapter}
}).then(() => {
return Parse.Object.saveAll(installations);
}).then(() => {
return Parse.Push.send(
{
data: {alert: "We fixed our status!"},
where: {deviceType: 'android'}
},
{ useMasterKey: true }
);
}).then(() => {
// it is enqueued so it can take time
return new Promise((resolve) => {
setTimeout(() => {
resolve();
}, 1000);
});
}).then(() => {
// query for push status
const query = new Parse.Query('_PushStatus');
return query.find({useMasterKey: true});
}).then((results) => {
// verify status is NOT broken
expect(results.length).toBe(1);
const result = results[0];
expect(result.get('status')).toEqual('succeeded');
// expect # less than # of batches used, assuming each batch is 100 pushes
expect(result.get('numSent')).toEqual(devices - (devices / 100));
expect(result.get('count')).toEqual(undefined);
done();
});
});

/**
* Verifies that _PushStatus cannot get stuck in a 'running' state
* Simulates an extended push, where some installations may be added,
* resulting in a non-zero count
*/
it('does not get stuck with _PushStatus \'running\' on many installations added', (done) => {
const devices = 1000;
const installations = provideInstallations(devices);

// add 1 iOS installation which we will omit & add later on
const iOSInstallations = [];

while(iOSInstallations.length !== (devices / 100)) {
const iOSInstallation = new Parse.Object("_Installation");
iOSInstallation.set("installationId", "installation_" + installations.length);
iOSInstallation.set("deviceToken", "device_token_" + installations.length);
iOSInstallation.set("deviceType", "ios");
installations.push(iOSInstallation);
iOSInstallations.push(iOSInstallation);
}

reconfigureServer({
push: {
adapter: {
send: function(body, installations) {
// simulate having added an installation before this was called
// thus invalidating our 'count' in _PushStatus
installations.push(iOSInstallations.pop());

return successfulAny(body, installations);
},
getValidPushTypes: function() {
return ["android"];
}
}
}
}).then(() => {
return Parse.Object.saveAll(installations);
}).then(() => {
return Parse.Push.send(
{
data: {alert: "We fixed our status!"},
where: {deviceType: {'$ne' : 'random'}}
},
{ useMasterKey: true }
);
}).then(() => {
// it is enqueued so it can take time
return new Promise((resolve) => {
setTimeout(() => {
resolve();
}, 1000);
});
}).then(() => {
// query for push status
const query = new Parse.Query('_PushStatus');
return query.find({useMasterKey: true});
}).then((results) => {
// verify status is NOT broken
expect(results.length).toBe(1);
const result = results[0];
expect(result.get('status')).toEqual('succeeded');
// expect # less than # of batches used, assuming each batch is 100 pushes
expect(result.get('numSent')).toEqual(devices + (devices / 100));
expect(result.get('count')).toEqual(undefined);
done();
});
});
});
4 changes: 2 additions & 2 deletions spec/PushWorker.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ describe('PushWorker', () => {
'failedPerType.ios': { __op: 'Increment', amount: 1 },
[`sentPerUTCOffset.${UTCOffset}`]: { __op: 'Increment', amount: 1 },
[`failedPerUTCOffset.${UTCOffset}`]: { __op: 'Increment', amount: 1 },
count: { __op: 'Increment', amount: -2 },
count: { __op: 'Increment', amount: -1 }
});
const query = new Parse.Query('_PushStatus');
return query.get(handler.objectId, { useMasterKey: true });
Expand Down Expand Up @@ -354,7 +354,7 @@ describe('PushWorker', () => {
'failedPerType.ios': { __op: 'Increment', amount: 1 },
[`sentPerUTCOffset.${UTCOffset}`]: { __op: 'Increment', amount: 1 },
[`failedPerUTCOffset.${UTCOffset}`]: { __op: 'Increment', amount: 1 },
count: { __op: 'Increment', amount: -2 },
count: { __op: 'Increment', amount: -1 }
});
done();
});
Expand Down
2 changes: 1 addition & 1 deletion src/Controllers/SchemaController.js
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ const defaultColumns = Object.freeze({
"failedPerType": {type:'Object'},
"sentPerUTCOffset": {type:'Object'},
"failedPerUTCOffset": {type:'Object'},
"count": {type:'Number'}
"count": {type:'Number'} // tracks # of batches queued and pending
},
_JobStatus: {
"jobName": {type: 'String'},
Expand Down
2 changes: 1 addition & 1 deletion src/Push/PushQueue.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ export class PushQueue {
if (!results || count == 0) {
return Promise.reject({error: 'PushController: no results in query'})
}
pushStatus.setRunning(count);
pushStatus.setRunning(Math.ceil(count / limit));
let skip = 0;
while (skip < count) {
const query = { where,
Expand Down
20 changes: 15 additions & 5 deletions src/StatusHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,18 @@ export function pushStatusHandler(config, existingObjectId) {
});
}

const setRunning = function(count) {
logger.verbose(`_PushStatus ${objectId}: sending push to %d installations`, count);
return handler.update({status:"pending", objectId: objectId},
{status: "running", count });
const setRunning = function(batches) {
logger.verbose(`_PushStatus ${objectId}: sending push to installations with %d batches`, batches);
return handler.update(
{
status:"pending",
objectId: objectId
},
{
status: "running",
count: batches
}
);
}

const trackSent = function(results, UTCOffset, cleanupInstallations = process.env.PARSE_SERVER_CLEANUP_INVALID_INSTALLATIONS) {
Expand Down Expand Up @@ -235,7 +243,6 @@ export function pushStatusHandler(config, existingObjectId) {
}
return memo;
}, update);
incrementOp(update, 'count', -results.length);
}

logger.verbose(`_PushStatus ${objectId}: sent push! %d success, %d failures`, update.numSent, update.numFailed);
Expand All @@ -259,6 +266,9 @@ export function pushStatusHandler(config, existingObjectId) {
});
}

// indicate this batch is complete
incrementOp(update, 'count', -1);

return handler.update({ objectId }, update).then((res) => {
if (res && res.count === 0) {
return this.complete();
Expand Down

0 comments on commit c1a7347

Please sign in to comment.