Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Upgrade Redis 3 to 4 for LiveQuery #8333

Merged
merged 13 commits into from
Nov 26, 2022
2 changes: 1 addition & 1 deletion spec/ParseGraphQLServer.spec.js
Original file line number Diff line number Diff line change
@@ -432,7 +432,7 @@ describe('ParseGraphQLServer', () => {
const expressApp = express();
httpServer = http.createServer(expressApp);
expressApp.use('/parse', parseServer.app);
parseLiveQueryServer = ParseServer.createLiveQueryServer(httpServer, {
parseLiveQueryServer = await ParseServer.createLiveQueryServer(httpServer, {
port: 1338,
});
parseGraphQLServer.applyGraphQL(expressApp);
56 changes: 56 additions & 0 deletions spec/ParseLiveQueryRedis.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
if (process.env.PARSE_SERVER_TEST_CACHE === 'redis') {
describe('ParseLiveQuery redis', () => {
afterEach(async () => {
const client = await Parse.CoreManager.getLiveQueryController().getDefaultLiveQueryClient();
client.close();
});
it('can connect', async () => {
await reconfigureServer({
startLiveQueryServer: true,
liveQuery: {
classNames: ['TestObject'],
redisURL: 'redis://localhost:6379',
},
liveQueryServerOptions: {
redisURL: 'redis://localhost:6379',
},
});
const subscription = await new Parse.Query('TestObject').subscribe();
const [object] = await Promise.all([
new Parse.Object('TestObject').save(),
new Promise(resolve =>
subscription.on('create', () => {
resolve();
})
),
]);
await Promise.all([
new Promise(resolve =>
subscription.on('delete', () => {
resolve();
})
),
object.destroy(),
]);
});

it('can call connect twice', async () => {
const server = await reconfigureServer({
startLiveQueryServer: true,
liveQuery: {
classNames: ['TestObject'],
redisURL: 'redis://localhost:6379',
},
liveQueryServerOptions: {
redisURL: 'redis://localhost:6379',
},
});
expect(server.config.liveQueryController.liveQueryPublisher.parsePublisher.isOpen).toBeTrue();
await server.config.liveQueryController.connect();
expect(server.config.liveQueryController.liveQueryPublisher.parsePublisher.isOpen).toBeTrue();
expect(server.liveQueryServer.subscriber.isOpen).toBe(true);
await server.liveQueryServer.connect();
expect(server.liveQueryServer.subscriber.isOpen).toBe(true);
});
});
}
18 changes: 9 additions & 9 deletions spec/ParseLiveQueryServer.spec.js
Original file line number Diff line number Diff line change
@@ -94,29 +94,29 @@ describe('ParseLiveQueryServer', function () {
expect(parseLiveQueryServer.subscriptions.size).toBe(0);
});

it('can be initialized from ParseServer', function () {
it('can be initialized from ParseServer', async () => {
const httpServer = {};
const parseLiveQueryServer = ParseServer.createLiveQueryServer(httpServer, {});
const parseLiveQueryServer = await ParseServer.createLiveQueryServer(httpServer, {});

expect(parseLiveQueryServer.clientId).toBeUndefined();
expect(parseLiveQueryServer.clients.size).toBe(0);
expect(parseLiveQueryServer.subscriptions.size).toBe(0);
});

it('can be initialized from ParseServer without httpServer', function (done) {
const parseLiveQueryServer = ParseServer.createLiveQueryServer(undefined, {
it('can be initialized from ParseServer without httpServer', async () => {
const parseLiveQueryServer = await ParseServer.createLiveQueryServer(undefined, {
port: 22345,
});

expect(parseLiveQueryServer.clientId).toBeUndefined();
expect(parseLiveQueryServer.clients.size).toBe(0);
expect(parseLiveQueryServer.subscriptions.size).toBe(0);
parseLiveQueryServer.server.close(done);
await new Promise(resolve => parseLiveQueryServer.server.close(resolve));
});

describe_only_db('mongo')('initialization', () => {
it('can be initialized through ParseServer without liveQueryServerOptions', function (done) {
const parseServer = ParseServer.start({
it('can be initialized through ParseServer without liveQueryServerOptions', async function (done) {
const parseServer = await ParseServer.start({
appId: 'hello',
masterKey: 'world',
port: 22345,
@@ -137,8 +137,8 @@ describe('ParseLiveQueryServer', function () {
});
});

it('can be initialized through ParseServer with liveQueryServerOptions', function (done) {
const parseServer = ParseServer.start({
it('can be initialized through ParseServer with liveQueryServerOptions', async function (done) {
const parseServer = await ParseServer.start({
appId: 'hello',
masterKey: 'world',
port: 22346,
6 changes: 4 additions & 2 deletions spec/RedisPubSub.spec.js
Original file line number Diff line number Diff line change
@@ -15,7 +15,8 @@ describe('RedisPubSub', function () {
});

const redis = require('redis');
expect(redis.createClient).toHaveBeenCalledWith('redisAddress', {
expect(redis.createClient).toHaveBeenCalledWith({
url: 'redisAddress',
socket_keepalive: true,
no_ready_check: true,
});
@@ -28,7 +29,8 @@ describe('RedisPubSub', function () {
});

const redis = require('redis');
expect(redis.createClient).toHaveBeenCalledWith('redisAddress', {
expect(redis.createClient).toHaveBeenCalledWith({
url: 'redisAddress',
socket_keepalive: true,
no_ready_check: true,
});
24 changes: 13 additions & 11 deletions spec/helper.js
Original file line number Diff line number Diff line change
@@ -173,17 +173,19 @@ const reconfigureServer = (changedConfiguration = {}) => {
port,
});
cache.clear();
parseServer = ParseServer.start(newConfiguration);
parseServer.expressApp.use('/1', err => {
console.error(err);
fail('should not call next');
});
server = parseServer.server;
server.on('connection', connection => {
const key = `${connection.remoteAddress}:${connection.remotePort}`;
openConnections[key] = connection;
connection.on('close', () => {
delete openConnections[key];
ParseServer.start(newConfiguration).then(_parseServer => {
parseServer = _parseServer;
parseServer.expressApp.use('/1', err => {
console.error(err);
fail('should not call next');
});
server = parseServer.server;
server.on('connection', connection => {
const key = `${connection.remoteAddress}:${connection.remotePort}`;
openConnections[key] = connection;
connection.on('close', () => {
delete openConnections[key];
});
});
});
} catch (error) {
4 changes: 2 additions & 2 deletions src/Adapters/PubSub/RedisPubSub.js
Original file line number Diff line number Diff line change
@@ -2,12 +2,12 @@ import { createClient } from 'redis';

function createPublisher({ redisURL, redisOptions = {} }): any {
redisOptions.no_ready_check = true;
return createClient(redisURL, redisOptions);
return createClient({ url: redisURL, ...redisOptions });
}

function createSubscriber({ redisURL, redisOptions = {} }): any {
redisOptions.no_ready_check = true;
return createClient(redisURL, redisOptions);
return createClient({ url: redisURL, ...redisOptions });
}

const RedisPubSub = {
4 changes: 4 additions & 0 deletions src/Controllers/LiveQueryController.js
Original file line number Diff line number Diff line change
@@ -21,6 +21,10 @@ export class LiveQueryController {
this.liveQueryPublisher = new ParseCloudCodePublisher(config);
}

connect() {
return this.liveQueryPublisher.connect();
}

onAfterSave(
className: string,
currentObject: any,
9 changes: 9 additions & 0 deletions src/LiveQuery/ParseCloudCodePublisher.js
Original file line number Diff line number Diff line change
@@ -11,6 +11,15 @@ class ParseCloudCodePublisher {
this.parsePublisher = ParsePubSub.createPublisher(config);
}

async connect() {
if (typeof this.parsePublisher.connect === 'function') {
if (this.parsePublisher.isOpen) {
return;
}
return Promise.resolve(this.parsePublisher.connect());
}
}

onCloudCodeAfterSave(request: any): void {
this._onCloudCodeMessage(Parse.applicationId + 'afterSave', request);
}
33 changes: 24 additions & 9 deletions src/LiveQuery/ParseLiveQueryServer.js
Original file line number Diff line number Diff line change
@@ -73,15 +73,25 @@ class ParseLiveQueryServer {
parseWebsocket => this._onConnect(parseWebsocket),
config
);

// Initialize subscriber
this.subscriber = ParsePubSub.createSubscriber(config);
this.subscriber.subscribe(Parse.applicationId + 'afterSave');
this.subscriber.subscribe(Parse.applicationId + 'afterDelete');
this.subscriber.subscribe(Parse.applicationId + 'clearCache');
// Register message handler for subscriber. When publisher get messages, it will publish message
// to the subscribers and the handler will be called.
this.subscriber.on('message', (channel, messageStr) => {
if (!this.subscriber.connect) {
this.connect();
}
}

async connect() {
if (this.subscriber.isOpen) {
return;
}
if (typeof this.subscriber.connect === 'function') {
await Promise.resolve(this.subscriber.connect());
} else {
this.subscriber.isOpen = true;
}
this._createSubscribers();
}
_createSubscribers() {
const messageRecieved = (channel, messageStr) => {
logger.verbose('Subscribe message %j', messageStr);
let message;
try {
@@ -102,7 +112,12 @@ class ParseLiveQueryServer {
} else {
logger.error('Get message %s from unknown channel %j', message, channel);
}
});
};
this.subscriber.on('message', (channel, messageStr) => messageRecieved(channel, messageStr));
for (const field of ['afterSave', 'afterDelete', 'clearCache']) {
const channel = `${Parse.applicationId}${field}`;
this.subscriber.subscribe(channel, messageStr => messageRecieved(channel, messageStr));
}
}

// Message is the JSON object from publisher. Message.currentParseObject is the ParseObject JSON after changes.
20 changes: 14 additions & 6 deletions src/ParseServer.js
Original file line number Diff line number Diff line change
@@ -77,7 +77,12 @@ class ParseServer {

const allControllers = controllers.getControllers(options);

const { loggerController, databaseController, hooksController } = allControllers;
const {
loggerController,
databaseController,
hooksController,
liveQueryController,
} = allControllers;
this.config = Config.put(Object.assign({}, options, allControllers));

logging.setLogger(loggerController);
@@ -98,6 +103,7 @@ class ParseServer {
) {
startupPromises.push(options.cacheAdapter.connect());
}
startupPromises.push(liveQueryController.connect());
await Promise.all(startupPromises);
if (serverStartComplete) {
serverStartComplete();
@@ -263,7 +269,7 @@ class ParseServer {
* @param {Function} callback called when the server has started
* @returns {ParseServer} the parse server instance
*/
start(options: ParseServerOptions, callback: ?() => void) {
async start(options: ParseServerOptions, callback: ?() => void) {
const app = express();
if (options.middleware) {
let middleware;
@@ -307,7 +313,7 @@ class ParseServer {
this.server = server;

if (options.startLiveQueryServer || options.liveQueryServerOptions) {
this.liveQueryServer = ParseServer.createLiveQueryServer(
this.liveQueryServer = await ParseServer.createLiveQueryServer(
server,
options.liveQueryServerOptions,
options
@@ -338,9 +344,9 @@ class ParseServer {
* @param {Server} httpServer an optional http server to pass
* @param {LiveQueryServerOptions} config options for the liveQueryServer
* @param {ParseServerOptions} options options for the ParseServer
* @returns {ParseLiveQueryServer} the live query server instance
* @returns {Promise<ParseLiveQueryServer>} the live query server instance
*/
static createLiveQueryServer(
static async createLiveQueryServer(
httpServer,
config: LiveQueryServerOptions,
options: ParseServerOptions
@@ -350,7 +356,9 @@ class ParseServer {
httpServer = require('http').createServer(app);
httpServer.listen(config.port);
}
return new ParseLiveQueryServer(httpServer, config, options);
const server = new ParseLiveQueryServer(httpServer, config, options);
await server.connect();
return server;
}

static verifyServerUrl(callback) {