From e8d1cb03222e8f14c41ad9bc467fc3f01a163d00 Mon Sep 17 00:00:00 2001 From: Promise Xu Date: Wed, 29 Apr 2020 17:56:59 -0400 Subject: [PATCH 1/2] Handle shutdown for RedisCacheAdapter --- spec/RedisCacheAdapter.spec.js | 55 +++++++++++-------- src/Adapters/Cache/RedisCacheAdapter/index.js | 36 ++++++++---- src/ParseServer.js | 28 ++++++---- 3 files changed, 72 insertions(+), 47 deletions(-) diff --git a/spec/RedisCacheAdapter.spec.js b/spec/RedisCacheAdapter.spec.js index 9ae53cf688..11962c9181 100644 --- a/spec/RedisCacheAdapter.spec.js +++ b/spec/RedisCacheAdapter.spec.js @@ -9,17 +9,17 @@ and make sure a redis server is available on the default port */ describe_only(() => { return process.env.PARSE_SERVER_TEST_CACHE === 'redis'; -})('RedisCacheAdapter', function() { +})('RedisCacheAdapter', function () { const KEY = 'hello'; const VALUE = 'world'; function wait(sleep) { - return new Promise(function(resolve) { + return new Promise(function (resolve) { setTimeout(resolve, sleep); }); } - it('should get/set/clear', done => { + it('should get/set/clear', (done) => { const cache = new RedisCacheAdapter({ ttl: NaN, }); @@ -27,85 +27,92 @@ describe_only(() => { cache .put(KEY, VALUE) .then(() => cache.get(KEY)) - .then(value => expect(value).toEqual(VALUE)) + .then((value) => expect(value).toEqual(VALUE)) .then(() => cache.clear()) .then(() => cache.get(KEY)) - .then(value => expect(value).toEqual(null)) + .then((value) => expect(value).toEqual(null)) .then(done); }); - it('should expire after ttl', done => { + it('should expire after ttl', (done) => { const cache = new RedisCacheAdapter(null, 50); cache .put(KEY, VALUE) .then(() => cache.get(KEY)) - .then(value => expect(value).toEqual(VALUE)) + .then((value) => expect(value).toEqual(VALUE)) .then(wait.bind(null, 52)) .then(() => cache.get(KEY)) - .then(value => expect(value).toEqual(null)) + .then((value) => expect(value).toEqual(null)) .then(done); }); - it('should not store value for ttl=0', done => { + it('should not store value for ttl=0', (done) => { const cache = new RedisCacheAdapter(null, 5); cache .put(KEY, VALUE, 0) .then(() => cache.get(KEY)) - .then(value => expect(value).toEqual(null)) + .then((value) => expect(value).toEqual(null)) .then(done); }); - it('should not expire when ttl=Infinity', done => { + it('should not expire when ttl=Infinity', (done) => { const cache = new RedisCacheAdapter(null, 1); cache .put(KEY, VALUE, Infinity) .then(() => cache.get(KEY)) - .then(value => expect(value).toEqual(VALUE)) + .then((value) => expect(value).toEqual(VALUE)) .then(wait.bind(null, 5)) .then(() => cache.get(KEY)) - .then(value => expect(value).toEqual(VALUE)) + .then((value) => expect(value).toEqual(VALUE)) .then(done); }); - it('should fallback to default ttl', done => { + it('should fallback to default ttl', (done) => { const cache = new RedisCacheAdapter(null, 1); let promise = Promise.resolve(); - [-100, null, undefined, 'not number', true].forEach(ttl => { + [-100, null, undefined, 'not number', true].forEach((ttl) => { promise = promise.then(() => cache .put(KEY, VALUE, ttl) .then(() => cache.get(KEY)) - .then(value => expect(value).toEqual(VALUE)) + .then((value) => expect(value).toEqual(VALUE)) .then(wait.bind(null, 5)) .then(() => cache.get(KEY)) - .then(value => expect(value).toEqual(null)) + .then((value) => expect(value).toEqual(null)) ); }); promise.then(done); }); - it('should find un-expired records', done => { + it('should find un-expired records', (done) => { const cache = new RedisCacheAdapter(null, 5); cache .put(KEY, VALUE) .then(() => cache.get(KEY)) - .then(value => expect(value).toEqual(VALUE)) + .then((value) => expect(value).toEqual(VALUE)) .then(wait.bind(null, 1)) .then(() => cache.get(KEY)) - .then(value => expect(value).not.toEqual(null)) + .then((value) => expect(value).not.toEqual(null)) .then(done); }); + + it('handleShutdown, close connection', async () => { + const cache = new RedisCacheAdapter(null, 5); + + await cache.handleShutdown(); + expect(cache.client.connected).toBe(false); + }); }); describe_only(() => { return process.env.PARSE_SERVER_TEST_CACHE === 'redis'; -})('RedisCacheAdapter/KeyPromiseQueue', function() { +})('RedisCacheAdapter/KeyPromiseQueue', function () { const KEY1 = 'key1'; const KEY2 = 'key2'; const VALUE = 'hello'; @@ -120,7 +127,7 @@ describe_only(() => { return Object.keys(cache.queue.queue).length; } - it('it should clear completed operations from queue', done => { + it('it should clear completed operations from queue', (done) => { const cache = new RedisCacheAdapter({ ttl: NaN }); // execute a bunch of operations in sequence @@ -142,7 +149,7 @@ describe_only(() => { promise.then(() => expect(getQueueCount(cache)).toEqual(0)).then(done); }); - it('it should count per key chained operations correctly', done => { + it('it should count per key chained operations correctly', (done) => { const cache = new RedisCacheAdapter({ ttl: NaN }); let key1Promise = Promise.resolve(); @@ -168,7 +175,7 @@ describe_only(() => { describe_only(() => { return process.env.PARSE_SERVER_TEST_CACHE === 'redis'; -})('Redis Performance', function() { +})('Redis Performance', function () { let cacheAdapter; let getSpy; let putSpy; diff --git a/src/Adapters/Cache/RedisCacheAdapter/index.js b/src/Adapters/Cache/RedisCacheAdapter/index.js index cf0bb71638..0619d1ebcb 100644 --- a/src/Adapters/Cache/RedisCacheAdapter/index.js +++ b/src/Adapters/Cache/RedisCacheAdapter/index.js @@ -9,7 +9,7 @@ function debug() { logger.debug.apply(logger, ['RedisCacheAdapter', ...arguments]); } -const isValidTTL = ttl => typeof ttl === 'number' && ttl > 0; +const isValidTTL = (ttl) => typeof ttl === 'number' && ttl > 0; export class RedisCacheAdapter { constructor(redisCtx, ttl = DEFAULT_REDIS_TTL) { @@ -18,13 +18,27 @@ export class RedisCacheAdapter { this.queue = new KeyPromiseQueue(); } + handleShutdown() { + if (!this.client) { + return Promise.resolve(); + } + return new Promise((resolve) => { + this.client.quit((err) => { + if (err) { + logger.error('RedisCacheAdapter error on shutdown', { error: err }); + } + resolve(); + }); + }); + } + get(key) { debug('get', key); return this.queue.enqueue( key, () => - new Promise(resolve => { - this.client.get(key, function(err, res) { + new Promise((resolve) => { + this.client.get(key, function (err, res) { debug('-> get', key, res); if (!res) { return resolve(null); @@ -48,8 +62,8 @@ export class RedisCacheAdapter { return this.queue.enqueue( key, () => - new Promise(resolve => { - this.client.set(key, value, function() { + new Promise((resolve) => { + this.client.set(key, value, function () { resolve(); }); }) @@ -63,8 +77,8 @@ export class RedisCacheAdapter { return this.queue.enqueue( key, () => - new Promise(resolve => { - this.client.psetex(key, ttl, value, function() { + new Promise((resolve) => { + this.client.psetex(key, ttl, value, function () { resolve(); }); }) @@ -76,8 +90,8 @@ export class RedisCacheAdapter { return this.queue.enqueue( key, () => - new Promise(resolve => { - this.client.del(key, function() { + new Promise((resolve) => { + this.client.del(key, function () { resolve(); }); }) @@ -89,8 +103,8 @@ export class RedisCacheAdapter { return this.queue.enqueue( FLUSH_DB_KEY, () => - new Promise(resolve => { - this.client.flushdb(function() { + new Promise((resolve) => { + this.client.flushdb(function () { resolve(); }); }) diff --git a/src/ParseServer.js b/src/ParseServer.js index 1fd279b4c7..081a8c15a4 100644 --- a/src/ParseServer.js +++ b/src/ParseServer.js @@ -85,7 +85,7 @@ class ParseServer { serverStartComplete(); } }) - .catch(error => { + .catch((error) => { if (serverStartComplete) { serverStartComplete(error); } else { @@ -126,6 +126,10 @@ class ParseServer { if (fileAdapter && typeof fileAdapter.handleShutdown === 'function') { promises.push(fileAdapter.handleShutdown()); } + const { adapter: cacheAdapter } = this.config.cacheController; + if (cacheAdapter && typeof cacheAdapter.handleShutdown === 'function') { + promises.push(cacheAdapter.handleShutdown()); + } return (promises.length > 0 ? Promise.all(promises) : Promise.resolve() @@ -154,7 +158,7 @@ class ParseServer { }) ); - api.use('/health', function(req, res) { + api.use('/health', function (req, res) { res.json({ status: 'ok', }); @@ -179,7 +183,7 @@ class ParseServer { if (!process.env.TESTING) { //This causes tests to spew some useless warnings, so disable in test /* istanbul ignore next */ - process.on('uncaughtException', err => { + process.on('uncaughtException', (err) => { if (err.code === 'EADDRINUSE') { // user-friendly message for this common error process.stderr.write( @@ -192,7 +196,7 @@ class ParseServer { }); // verify the server url after a 'mount' event is received /* istanbul ignore next */ - api.on('mount', function() { + api.on('mount', function () { ParseServer.verifyServerUrl(); }); } @@ -334,8 +338,8 @@ class ParseServer { if (Parse.serverURL) { const request = require('./request'); request({ url: Parse.serverURL.replace(/\/$/, '') + '/health' }) - .catch(response => response) - .then(response => { + .catch((response) => response) + .then((response) => { const json = response.data || null; if ( response.status !== 200 || @@ -368,7 +372,7 @@ function addParseCloud() { } function injectDefaults(options: ParseServerOptions) { - Object.keys(defaults).forEach(key => { + Object.keys(defaults).forEach((key) => { if (!Object.prototype.hasOwnProperty.call(options, key)) { options[key] = defaults[key]; } @@ -424,12 +428,12 @@ function injectDefaults(options: ParseServerOptions) { } // Merge protectedFields options with defaults. - Object.keys(defaults.protectedFields).forEach(c => { + Object.keys(defaults.protectedFields).forEach((c) => { const cur = options.protectedFields[c]; if (!cur) { options.protectedFields[c] = defaults.protectedFields[c]; } else { - Object.keys(defaults.protectedFields[c]).forEach(r => { + Object.keys(defaults.protectedFields[c]).forEach((r) => { const unq = new Set([ ...(options.protectedFields[c][r] || []), ...defaults.protectedFields[c][r], @@ -453,7 +457,7 @@ function configureListeners(parseServer) { const sockets = {}; /* Currently, express doesn't shut down immediately after receiving SIGINT/SIGTERM if it has client connections that haven't timed out. (This is a known issue with node - https://github.com/nodejs/node/issues/2642) This function, along with `destroyAliveConnections()`, intend to fix this behavior such that parse server will close all open connections and initiate the shutdown process as soon as it receives a SIGINT/SIGTERM signal. */ - server.on('connection', socket => { + server.on('connection', (socket) => { const socketId = socket.remoteAddress + ':' + socket.remotePort; sockets[socketId] = socket; socket.on('close', () => { @@ -461,7 +465,7 @@ function configureListeners(parseServer) { }); }); - const destroyAliveConnections = function() { + const destroyAliveConnections = function () { for (const socketId in sockets) { try { sockets[socketId].destroy(); @@ -471,7 +475,7 @@ function configureListeners(parseServer) { } }; - const handleShutdown = function() { + const handleShutdown = function () { process.stdout.write('Termination signal received. Shutting down.'); destroyAliveConnections(); server.close(); From 760c0653f232cdb822488bf005a324eb428db561 Mon Sep 17 00:00:00 2001 From: Promise Xu Date: Wed, 29 Apr 2020 19:02:34 -0400 Subject: [PATCH 2/2] connected value need to be tested in setTimeout --- spec/RedisCacheAdapter.spec.js | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/spec/RedisCacheAdapter.spec.js b/spec/RedisCacheAdapter.spec.js index 11962c9181..d4ff0327ab 100644 --- a/spec/RedisCacheAdapter.spec.js +++ b/spec/RedisCacheAdapter.spec.js @@ -106,7 +106,9 @@ describe_only(() => { const cache = new RedisCacheAdapter(null, 5); await cache.handleShutdown(); - expect(cache.client.connected).toBe(false); + setTimeout(() => { + expect(cache.client.connected).toBe(false); + }, 0); }); });