diff --git a/spec/ParseGraphQLServer.spec.js b/spec/ParseGraphQLServer.spec.js index 6bdd3b10f5..9ac60f8ae6 100644 --- a/spec/ParseGraphQLServer.spec.js +++ b/spec/ParseGraphQLServer.spec.js @@ -432,7 +432,7 @@ describe('ParseGraphQLServer', () => { const expressApp = express(); httpServer = http.createServer(expressApp); expressApp.use('/parse', parseServer.app); - parseLiveQueryServer = ParseServer.createLiveQueryServer(httpServer, { + parseLiveQueryServer = await ParseServer.createLiveQueryServer(httpServer, { port: 1338, }); parseGraphQLServer.applyGraphQL(expressApp); diff --git a/spec/ParseLiveQueryRedis.spec.js b/spec/ParseLiveQueryRedis.spec.js new file mode 100644 index 0000000000..3187d23cc8 --- /dev/null +++ b/spec/ParseLiveQueryRedis.spec.js @@ -0,0 +1,56 @@ +if (process.env.PARSE_SERVER_TEST_CACHE === 'redis') { + describe('ParseLiveQuery redis', () => { + afterEach(async () => { + const client = await Parse.CoreManager.getLiveQueryController().getDefaultLiveQueryClient(); + client.close(); + }); + it('can connect', async () => { + await reconfigureServer({ + startLiveQueryServer: true, + liveQuery: { + classNames: ['TestObject'], + redisURL: 'redis://localhost:6379', + }, + liveQueryServerOptions: { + redisURL: 'redis://localhost:6379', + }, + }); + const subscription = await new Parse.Query('TestObject').subscribe(); + const [object] = await Promise.all([ + new Parse.Object('TestObject').save(), + new Promise(resolve => + subscription.on('create', () => { + resolve(); + }) + ), + ]); + await Promise.all([ + new Promise(resolve => + subscription.on('delete', () => { + resolve(); + }) + ), + object.destroy(), + ]); + }); + + it('can call connect twice', async () => { + const server = await reconfigureServer({ + startLiveQueryServer: true, + liveQuery: { + classNames: ['TestObject'], + redisURL: 'redis://localhost:6379', + }, + liveQueryServerOptions: { + redisURL: 'redis://localhost:6379', + }, + }); + expect(server.config.liveQueryController.liveQueryPublisher.parsePublisher.isOpen).toBeTrue(); + await server.config.liveQueryController.connect(); + expect(server.config.liveQueryController.liveQueryPublisher.parsePublisher.isOpen).toBeTrue(); + expect(server.liveQueryServer.subscriber.isOpen).toBe(true); + await server.liveQueryServer.connect(); + expect(server.liveQueryServer.subscriber.isOpen).toBe(true); + }); + }); +} diff --git a/spec/ParseLiveQueryServer.spec.js b/spec/ParseLiveQueryServer.spec.js index 0d1a1e6387..7ec1b30941 100644 --- a/spec/ParseLiveQueryServer.spec.js +++ b/spec/ParseLiveQueryServer.spec.js @@ -94,29 +94,29 @@ describe('ParseLiveQueryServer', function () { expect(parseLiveQueryServer.subscriptions.size).toBe(0); }); - it('can be initialized from ParseServer', function () { + it('can be initialized from ParseServer', async () => { const httpServer = {}; - const parseLiveQueryServer = ParseServer.createLiveQueryServer(httpServer, {}); + const parseLiveQueryServer = await ParseServer.createLiveQueryServer(httpServer, {}); expect(parseLiveQueryServer.clientId).toBeUndefined(); expect(parseLiveQueryServer.clients.size).toBe(0); expect(parseLiveQueryServer.subscriptions.size).toBe(0); }); - it('can be initialized from ParseServer without httpServer', function (done) { - const parseLiveQueryServer = ParseServer.createLiveQueryServer(undefined, { + it('can be initialized from ParseServer without httpServer', async () => { + const parseLiveQueryServer = await ParseServer.createLiveQueryServer(undefined, { port: 22345, }); expect(parseLiveQueryServer.clientId).toBeUndefined(); expect(parseLiveQueryServer.clients.size).toBe(0); expect(parseLiveQueryServer.subscriptions.size).toBe(0); - parseLiveQueryServer.server.close(done); + await new Promise(resolve => parseLiveQueryServer.server.close(resolve)); }); describe_only_db('mongo')('initialization', () => { - it('can be initialized through ParseServer without liveQueryServerOptions', function (done) { - const parseServer = ParseServer.start({ + it('can be initialized through ParseServer without liveQueryServerOptions', async function (done) { + const parseServer = await ParseServer.start({ appId: 'hello', masterKey: 'world', port: 22345, @@ -137,8 +137,8 @@ describe('ParseLiveQueryServer', function () { }); }); - it('can be initialized through ParseServer with liveQueryServerOptions', function (done) { - const parseServer = ParseServer.start({ + it('can be initialized through ParseServer with liveQueryServerOptions', async function (done) { + const parseServer = await ParseServer.start({ appId: 'hello', masterKey: 'world', port: 22346, diff --git a/spec/RedisPubSub.spec.js b/spec/RedisPubSub.spec.js index c7def313cd..b68448ef51 100644 --- a/spec/RedisPubSub.spec.js +++ b/spec/RedisPubSub.spec.js @@ -15,7 +15,8 @@ describe('RedisPubSub', function () { }); const redis = require('redis'); - expect(redis.createClient).toHaveBeenCalledWith('redisAddress', { + expect(redis.createClient).toHaveBeenCalledWith({ + url: 'redisAddress', socket_keepalive: true, no_ready_check: true, }); @@ -28,7 +29,8 @@ describe('RedisPubSub', function () { }); const redis = require('redis'); - expect(redis.createClient).toHaveBeenCalledWith('redisAddress', { + expect(redis.createClient).toHaveBeenCalledWith({ + url: 'redisAddress', socket_keepalive: true, no_ready_check: true, }); diff --git a/spec/helper.js b/spec/helper.js index 1afa0fc24c..40df5e627e 100644 --- a/spec/helper.js +++ b/spec/helper.js @@ -173,17 +173,19 @@ const reconfigureServer = (changedConfiguration = {}) => { port, }); cache.clear(); - parseServer = ParseServer.start(newConfiguration); - parseServer.expressApp.use('/1', err => { - console.error(err); - fail('should not call next'); - }); - server = parseServer.server; - server.on('connection', connection => { - const key = `${connection.remoteAddress}:${connection.remotePort}`; - openConnections[key] = connection; - connection.on('close', () => { - delete openConnections[key]; + ParseServer.start(newConfiguration).then(_parseServer => { + parseServer = _parseServer; + parseServer.expressApp.use('/1', err => { + console.error(err); + fail('should not call next'); + }); + server = parseServer.server; + server.on('connection', connection => { + const key = `${connection.remoteAddress}:${connection.remotePort}`; + openConnections[key] = connection; + connection.on('close', () => { + delete openConnections[key]; + }); }); }); } catch (error) { diff --git a/src/Adapters/PubSub/RedisPubSub.js b/src/Adapters/PubSub/RedisPubSub.js index 7a886d5236..cc2b3a9792 100644 --- a/src/Adapters/PubSub/RedisPubSub.js +++ b/src/Adapters/PubSub/RedisPubSub.js @@ -2,12 +2,12 @@ import { createClient } from 'redis'; function createPublisher({ redisURL, redisOptions = {} }): any { redisOptions.no_ready_check = true; - return createClient(redisURL, redisOptions); + return createClient({ url: redisURL, ...redisOptions }); } function createSubscriber({ redisURL, redisOptions = {} }): any { redisOptions.no_ready_check = true; - return createClient(redisURL, redisOptions); + return createClient({ url: redisURL, ...redisOptions }); } const RedisPubSub = { diff --git a/src/Controllers/LiveQueryController.js b/src/Controllers/LiveQueryController.js index 9a5b6d0ef1..b3ee7fcf65 100644 --- a/src/Controllers/LiveQueryController.js +++ b/src/Controllers/LiveQueryController.js @@ -21,6 +21,10 @@ export class LiveQueryController { this.liveQueryPublisher = new ParseCloudCodePublisher(config); } + connect() { + return this.liveQueryPublisher.connect(); + } + onAfterSave( className: string, currentObject: any, diff --git a/src/LiveQuery/ParseCloudCodePublisher.js b/src/LiveQuery/ParseCloudCodePublisher.js index 3ecd740e12..0e0dce1417 100644 --- a/src/LiveQuery/ParseCloudCodePublisher.js +++ b/src/LiveQuery/ParseCloudCodePublisher.js @@ -11,6 +11,15 @@ class ParseCloudCodePublisher { this.parsePublisher = ParsePubSub.createPublisher(config); } + async connect() { + if (typeof this.parsePublisher.connect === 'function') { + if (this.parsePublisher.isOpen) { + return; + } + return Promise.resolve(this.parsePublisher.connect()); + } + } + onCloudCodeAfterSave(request: any): void { this._onCloudCodeMessage(Parse.applicationId + 'afterSave', request); } diff --git a/src/LiveQuery/ParseLiveQueryServer.js b/src/LiveQuery/ParseLiveQueryServer.js index 02c6ad30a1..78cdf55e07 100644 --- a/src/LiveQuery/ParseLiveQueryServer.js +++ b/src/LiveQuery/ParseLiveQueryServer.js @@ -73,15 +73,25 @@ class ParseLiveQueryServer { parseWebsocket => this._onConnect(parseWebsocket), config ); - - // Initialize subscriber this.subscriber = ParsePubSub.createSubscriber(config); - this.subscriber.subscribe(Parse.applicationId + 'afterSave'); - this.subscriber.subscribe(Parse.applicationId + 'afterDelete'); - this.subscriber.subscribe(Parse.applicationId + 'clearCache'); - // Register message handler for subscriber. When publisher get messages, it will publish message - // to the subscribers and the handler will be called. - this.subscriber.on('message', (channel, messageStr) => { + if (!this.subscriber.connect) { + this.connect(); + } + } + + async connect() { + if (this.subscriber.isOpen) { + return; + } + if (typeof this.subscriber.connect === 'function') { + await Promise.resolve(this.subscriber.connect()); + } else { + this.subscriber.isOpen = true; + } + this._createSubscribers(); + } + _createSubscribers() { + const messageRecieved = (channel, messageStr) => { logger.verbose('Subscribe message %j', messageStr); let message; try { @@ -102,7 +112,12 @@ class ParseLiveQueryServer { } else { logger.error('Get message %s from unknown channel %j', message, channel); } - }); + }; + this.subscriber.on('message', (channel, messageStr) => messageRecieved(channel, messageStr)); + for (const field of ['afterSave', 'afterDelete', 'clearCache']) { + const channel = `${Parse.applicationId}${field}`; + this.subscriber.subscribe(channel, messageStr => messageRecieved(channel, messageStr)); + } } // Message is the JSON object from publisher. Message.currentParseObject is the ParseObject JSON after changes. diff --git a/src/ParseServer.js b/src/ParseServer.js index 0b2c985d1e..da1296d5d2 100644 --- a/src/ParseServer.js +++ b/src/ParseServer.js @@ -77,7 +77,12 @@ class ParseServer { const allControllers = controllers.getControllers(options); - const { loggerController, databaseController, hooksController } = allControllers; + const { + loggerController, + databaseController, + hooksController, + liveQueryController, + } = allControllers; this.config = Config.put(Object.assign({}, options, allControllers)); logging.setLogger(loggerController); @@ -98,6 +103,7 @@ class ParseServer { ) { startupPromises.push(options.cacheAdapter.connect()); } + startupPromises.push(liveQueryController.connect()); await Promise.all(startupPromises); if (serverStartComplete) { serverStartComplete(); @@ -263,7 +269,7 @@ class ParseServer { * @param {Function} callback called when the server has started * @returns {ParseServer} the parse server instance */ - start(options: ParseServerOptions, callback: ?() => void) { + async start(options: ParseServerOptions, callback: ?() => void) { const app = express(); if (options.middleware) { let middleware; @@ -307,7 +313,7 @@ class ParseServer { this.server = server; if (options.startLiveQueryServer || options.liveQueryServerOptions) { - this.liveQueryServer = ParseServer.createLiveQueryServer( + this.liveQueryServer = await ParseServer.createLiveQueryServer( server, options.liveQueryServerOptions, options @@ -338,9 +344,9 @@ class ParseServer { * @param {Server} httpServer an optional http server to pass * @param {LiveQueryServerOptions} config options for the liveQueryServer * @param {ParseServerOptions} options options for the ParseServer - * @returns {ParseLiveQueryServer} the live query server instance + * @returns {Promise} the live query server instance */ - static createLiveQueryServer( + static async createLiveQueryServer( httpServer, config: LiveQueryServerOptions, options: ParseServerOptions @@ -350,7 +356,9 @@ class ParseServer { httpServer = require('http').createServer(app); httpServer.listen(config.port); } - return new ParseLiveQueryServer(httpServer, config, options); + const server = new ParseLiveQueryServer(httpServer, config, options); + await server.connect(); + return server; } static verifyServerUrl(callback) {