Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle shutdown for RedisCacheAdapter #6658

Merged
merged 2 commits into from
Apr 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 33 additions & 24 deletions spec/RedisCacheAdapter.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,103 +9,112 @@ and make sure a redis server is available on the default port
*/
describe_only(() => {
return process.env.PARSE_SERVER_TEST_CACHE === 'redis';
})('RedisCacheAdapter', function() {
})('RedisCacheAdapter', function () {
const KEY = 'hello';
const VALUE = 'world';

function wait(sleep) {
return new Promise(function(resolve) {
return new Promise(function (resolve) {
setTimeout(resolve, sleep);
});
}

it('should get/set/clear', done => {
it('should get/set/clear', (done) => {
const cache = new RedisCacheAdapter({
ttl: NaN,
});

cache
.put(KEY, VALUE)
.then(() => cache.get(KEY))
.then(value => expect(value).toEqual(VALUE))
.then((value) => expect(value).toEqual(VALUE))
.then(() => cache.clear())
.then(() => cache.get(KEY))
.then(value => expect(value).toEqual(null))
.then((value) => expect(value).toEqual(null))
.then(done);
});

it('should expire after ttl', done => {
it('should expire after ttl', (done) => {
const cache = new RedisCacheAdapter(null, 50);

cache
.put(KEY, VALUE)
.then(() => cache.get(KEY))
.then(value => expect(value).toEqual(VALUE))
.then((value) => expect(value).toEqual(VALUE))
.then(wait.bind(null, 52))
.then(() => cache.get(KEY))
.then(value => expect(value).toEqual(null))
.then((value) => expect(value).toEqual(null))
.then(done);
});

it('should not store value for ttl=0', done => {
it('should not store value for ttl=0', (done) => {
const cache = new RedisCacheAdapter(null, 5);

cache
.put(KEY, VALUE, 0)
.then(() => cache.get(KEY))
.then(value => expect(value).toEqual(null))
.then((value) => expect(value).toEqual(null))
.then(done);
});

it('should not expire when ttl=Infinity', done => {
it('should not expire when ttl=Infinity', (done) => {
const cache = new RedisCacheAdapter(null, 1);

cache
.put(KEY, VALUE, Infinity)
.then(() => cache.get(KEY))
.then(value => expect(value).toEqual(VALUE))
.then((value) => expect(value).toEqual(VALUE))
.then(wait.bind(null, 5))
.then(() => cache.get(KEY))
.then(value => expect(value).toEqual(VALUE))
.then((value) => expect(value).toEqual(VALUE))
.then(done);
});

it('should fallback to default ttl', done => {
it('should fallback to default ttl', (done) => {
const cache = new RedisCacheAdapter(null, 1);
let promise = Promise.resolve();

[-100, null, undefined, 'not number', true].forEach(ttl => {
[-100, null, undefined, 'not number', true].forEach((ttl) => {
promise = promise.then(() =>
cache
.put(KEY, VALUE, ttl)
.then(() => cache.get(KEY))
.then(value => expect(value).toEqual(VALUE))
.then((value) => expect(value).toEqual(VALUE))
.then(wait.bind(null, 5))
.then(() => cache.get(KEY))
.then(value => expect(value).toEqual(null))
.then((value) => expect(value).toEqual(null))
);
});

promise.then(done);
});

it('should find un-expired records', done => {
it('should find un-expired records', (done) => {
const cache = new RedisCacheAdapter(null, 5);

cache
.put(KEY, VALUE)
.then(() => cache.get(KEY))
.then(value => expect(value).toEqual(VALUE))
.then((value) => expect(value).toEqual(VALUE))
.then(wait.bind(null, 1))
.then(() => cache.get(KEY))
.then(value => expect(value).not.toEqual(null))
.then((value) => expect(value).not.toEqual(null))
.then(done);
});

it('handleShutdown, close connection', async () => {
const cache = new RedisCacheAdapter(null, 5);

await cache.handleShutdown();
setTimeout(() => {
expect(cache.client.connected).toBe(false);
}, 0);
});
});

describe_only(() => {
return process.env.PARSE_SERVER_TEST_CACHE === 'redis';
})('RedisCacheAdapter/KeyPromiseQueue', function() {
})('RedisCacheAdapter/KeyPromiseQueue', function () {
const KEY1 = 'key1';
const KEY2 = 'key2';
const VALUE = 'hello';
Expand All @@ -120,7 +129,7 @@ describe_only(() => {
return Object.keys(cache.queue.queue).length;
}

it('it should clear completed operations from queue', done => {
it('it should clear completed operations from queue', (done) => {
const cache = new RedisCacheAdapter({ ttl: NaN });

// execute a bunch of operations in sequence
Expand All @@ -142,7 +151,7 @@ describe_only(() => {
promise.then(() => expect(getQueueCount(cache)).toEqual(0)).then(done);
});

it('it should count per key chained operations correctly', done => {
it('it should count per key chained operations correctly', (done) => {
const cache = new RedisCacheAdapter({ ttl: NaN });

let key1Promise = Promise.resolve();
Expand All @@ -168,7 +177,7 @@ describe_only(() => {

describe_only(() => {
return process.env.PARSE_SERVER_TEST_CACHE === 'redis';
})('Redis Performance', function() {
})('Redis Performance', function () {
let cacheAdapter;
let getSpy;
let putSpy;
Expand Down
36 changes: 25 additions & 11 deletions src/Adapters/Cache/RedisCacheAdapter/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ function debug() {
logger.debug.apply(logger, ['RedisCacheAdapter', ...arguments]);
}

const isValidTTL = ttl => typeof ttl === 'number' && ttl > 0;
const isValidTTL = (ttl) => typeof ttl === 'number' && ttl > 0;

export class RedisCacheAdapter {
constructor(redisCtx, ttl = DEFAULT_REDIS_TTL) {
Expand All @@ -18,13 +18,27 @@ export class RedisCacheAdapter {
this.queue = new KeyPromiseQueue();
}

handleShutdown() {
if (!this.client) {
return Promise.resolve();
}
return new Promise((resolve) => {
this.client.quit((err) => {
if (err) {
logger.error('RedisCacheAdapter error on shutdown', { error: err });
}
resolve();
});
});
}

get(key) {
debug('get', key);
return this.queue.enqueue(
key,
() =>
new Promise(resolve => {
this.client.get(key, function(err, res) {
new Promise((resolve) => {
this.client.get(key, function (err, res) {
debug('-> get', key, res);
if (!res) {
return resolve(null);
Expand All @@ -48,8 +62,8 @@ export class RedisCacheAdapter {
return this.queue.enqueue(
key,
() =>
new Promise(resolve => {
this.client.set(key, value, function() {
new Promise((resolve) => {
this.client.set(key, value, function () {
resolve();
});
})
Expand All @@ -63,8 +77,8 @@ export class RedisCacheAdapter {
return this.queue.enqueue(
key,
() =>
new Promise(resolve => {
this.client.psetex(key, ttl, value, function() {
new Promise((resolve) => {
this.client.psetex(key, ttl, value, function () {
resolve();
});
})
Expand All @@ -76,8 +90,8 @@ export class RedisCacheAdapter {
return this.queue.enqueue(
key,
() =>
new Promise(resolve => {
this.client.del(key, function() {
new Promise((resolve) => {
this.client.del(key, function () {
resolve();
});
})
Expand All @@ -89,8 +103,8 @@ export class RedisCacheAdapter {
return this.queue.enqueue(
FLUSH_DB_KEY,
() =>
new Promise(resolve => {
this.client.flushdb(function() {
new Promise((resolve) => {
this.client.flushdb(function () {
resolve();
});
})
Expand Down
28 changes: 16 additions & 12 deletions src/ParseServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class ParseServer {
serverStartComplete();
}
})
.catch(error => {
.catch((error) => {
if (serverStartComplete) {
serverStartComplete(error);
} else {
Expand Down Expand Up @@ -126,6 +126,10 @@ class ParseServer {
if (fileAdapter && typeof fileAdapter.handleShutdown === 'function') {
promises.push(fileAdapter.handleShutdown());
}
const { adapter: cacheAdapter } = this.config.cacheController;
if (cacheAdapter && typeof cacheAdapter.handleShutdown === 'function') {
promises.push(cacheAdapter.handleShutdown());
}
return (promises.length > 0
? Promise.all(promises)
: Promise.resolve()
Expand Down Expand Up @@ -154,7 +158,7 @@ class ParseServer {
})
);

api.use('/health', function(req, res) {
api.use('/health', function (req, res) {
res.json({
status: 'ok',
});
Expand All @@ -179,7 +183,7 @@ class ParseServer {
if (!process.env.TESTING) {
//This causes tests to spew some useless warnings, so disable in test
/* istanbul ignore next */
process.on('uncaughtException', err => {
process.on('uncaughtException', (err) => {
if (err.code === 'EADDRINUSE') {
// user-friendly message for this common error
process.stderr.write(
Expand All @@ -192,7 +196,7 @@ class ParseServer {
});
// verify the server url after a 'mount' event is received
/* istanbul ignore next */
api.on('mount', function() {
api.on('mount', function () {
ParseServer.verifyServerUrl();
});
}
Expand Down Expand Up @@ -334,8 +338,8 @@ class ParseServer {
if (Parse.serverURL) {
const request = require('./request');
request({ url: Parse.serverURL.replace(/\/$/, '') + '/health' })
.catch(response => response)
.then(response => {
.catch((response) => response)
.then((response) => {
const json = response.data || null;
if (
response.status !== 200 ||
Expand Down Expand Up @@ -368,7 +372,7 @@ function addParseCloud() {
}

function injectDefaults(options: ParseServerOptions) {
Object.keys(defaults).forEach(key => {
Object.keys(defaults).forEach((key) => {
if (!Object.prototype.hasOwnProperty.call(options, key)) {
options[key] = defaults[key];
}
Expand Down Expand Up @@ -424,12 +428,12 @@ function injectDefaults(options: ParseServerOptions) {
}

// Merge protectedFields options with defaults.
Object.keys(defaults.protectedFields).forEach(c => {
Object.keys(defaults.protectedFields).forEach((c) => {
const cur = options.protectedFields[c];
if (!cur) {
options.protectedFields[c] = defaults.protectedFields[c];
} else {
Object.keys(defaults.protectedFields[c]).forEach(r => {
Object.keys(defaults.protectedFields[c]).forEach((r) => {
const unq = new Set([
...(options.protectedFields[c][r] || []),
...defaults.protectedFields[c][r],
Expand All @@ -453,15 +457,15 @@ function configureListeners(parseServer) {
const sockets = {};
/* Currently, express doesn't shut down immediately after receiving SIGINT/SIGTERM if it has client connections that haven't timed out. (This is a known issue with node - https://github.com/nodejs/node/issues/2642)
This function, along with `destroyAliveConnections()`, intend to fix this behavior such that parse server will close all open connections and initiate the shutdown process as soon as it receives a SIGINT/SIGTERM signal. */
server.on('connection', socket => {
server.on('connection', (socket) => {
const socketId = socket.remoteAddress + ':' + socket.remotePort;
sockets[socketId] = socket;
socket.on('close', () => {
delete sockets[socketId];
});
});

const destroyAliveConnections = function() {
const destroyAliveConnections = function () {
for (const socketId in sockets) {
try {
sockets[socketId].destroy();
Expand All @@ -471,7 +475,7 @@ function configureListeners(parseServer) {
}
};

const handleShutdown = function() {
const handleShutdown = function () {
process.stdout.write('Termination signal received. Shutting down.');
destroyAliveConnections();
server.close();
Expand Down