Skip to content

Commit

Permalink
feat: Sockets Preview (#231)
Browse files Browse the repository at this point in the history
* Sockets WIP

* Sockets WIP

* Sockets WIP: fix dateParser

* Sockets WIP: fix react loading

Fixes #221
  • Loading branch information
paveltiunov authored Oct 15, 2019
1 parent aa736b1 commit 89fc762
Show file tree
Hide file tree
Showing 30 changed files with 22,422 additions and 18,368 deletions.
62 changes: 62 additions & 0 deletions packages/cubejs-api-gateway/LocalSubscriptionStore.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
class LocalSubscriptionStore {
constructor(options) {
options = options || {};
this.connections = {};
this.hearBeatInterval = options.heartBeatInterval || 60;
}

async getSubscription(connectionId, subscriptionId) {
const connection = this.getConnection(connectionId);
return connection.subscriptions[subscriptionId];
}

async subscribe(connectionId, subscriptionId, subscription) {
const connection = this.getConnection(connectionId);
connection.subscriptions[subscriptionId] = {
...subscription,
timestamp: new Date()
};
}

async unsubscribe(connectionId, subscriptionId) {
const connection = this.getConnection(connectionId);
delete connection.subscriptions[subscriptionId];
}

async getAllSubscriptions() {
return Object.keys(this.connections).map(connectionId => {
Object.keys(this.connections[connectionId].subscriptions).filter(
subscriptionId => new Date().getTime() -
this.connections[connectionId].subscriptions[subscriptionId].timestamp.getTime() >
this.hearBeatInterval * 4 * 1000
).forEach(subscriptionId => { delete this.connections[connectionId].subscriptions[subscriptionId]; });

return Object.keys(this.connections[connectionId].subscriptions)
.map(subscriptionId => ({
connectionId,
...this.connections[connectionId].subscriptions[subscriptionId]
}));
}).reduce((a, b) => a.concat(b), []);
}

async cleanupSubscriptions(connectionId) {
delete this.connections[connectionId];
}

async getAuthContext(connectionId) {
return this.getConnection(connectionId).authContext;
}

async setAuthContext(connectionId, authContext) {
this.getConnection(connectionId).authContext = authContext;
}

getConnection(connectionId) {
if (!this.connections[connectionId]) {
this.connections[connectionId] = { subscriptions: {} };
}
return this.connections[connectionId];
}
}

module.exports = LocalSubscriptionStore;
104 changes: 104 additions & 0 deletions packages/cubejs-api-gateway/SubscriptionServer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
const UserError = require('./UserError');

const methodParams = {
load: ['query'],
sql: ['query'],
meta: [],
subscribe: ['query'],
unsubscribe: []
};

class SubscriptionServer {
constructor(apiGateway, sendMessage, subscriptionStore) {
this.apiGateway = apiGateway;
this.sendMessage = sendMessage;
this.subscriptionStore = subscriptionStore;
}

resultFn(connectionId, messageId) {
return (message, { status } = {}) => this.sendMessage(connectionId, { messageId, message, status: status || 200 });
}

async processMessage(connectionId, message, isSubscription) {
let context = {};
try {
if (typeof message === 'string') {
message = JSON.parse(message);
}
if (message.authorization) {
const newContext = {};
await this.apiGateway.checkAuthFn(newContext, message.authorization);
await this.subscriptionStore.setAuthContext(connectionId, newContext);
this.sendMessage(connectionId, { handshake: true });
return;
}

if (message.unsubscribe) {
await this.subscriptionStore.unsubscribe(connectionId, message.unsubscribe);
return;
}

if (!message.messageId) {
throw new UserError(`messageId is required`);
}

context = await this.subscriptionStore.getAuthContext(connectionId);

if (!context) {
await this.sendMessage(
connectionId,
{
messageId: message.messageId,
message: { error: 'Not authorized' },
status: 403
}
);
return;
}

if (!methodParams[message.method]) {
throw new UserError(`Unsupported method: ${message.method}`);
}

const allowedParams = methodParams[message.method];
const params = allowedParams.map(k => ({ [k]: (message.params || {})[k] }))
.reduce((a, b) => ({ ...a, ...b }), {});
await this.apiGateway[message.method]({
...params,
context,
isSubscription,
res: this.resultFn(connectionId, message.messageId),
subscriptionState: async () => {
const subscription = await this.subscriptionStore.getSubscription(connectionId, message.messageId);
return subscription && subscription.state;
},
subscribe: async (state) => this.subscriptionStore.subscribe(connectionId, message.messageId, {
message,
state
}),
unsubscribe: async () => this.subscriptionStore.unsubscribe(connectionId, message.messageId)
});
await this.sendMessage(connectionId, { messageProcessedId: message.messageId });
} catch (e) {
this.apiGateway.handleError({
e,
query: message.query,
res: this.resultFn(connectionId, message.messageId),
context
});
}
}

async processSubscriptions() {
const allSubscriptions = await this.subscriptionStore.getAllSubscriptions();
await Promise.all(allSubscriptions.map(async subscription => {
await this.processMessage(subscription.connectionId, subscription.message, true);
}));
}

async disconnect(connectionId) {
await this.subscriptionStore.cleanupSubscriptions(connectionId);
}
}

module.exports = SubscriptionServer;
14 changes: 7 additions & 7 deletions packages/cubejs-api-gateway/dateParser.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ const UserError = require('./UserError');
module.exports = (dateString) => {
let momentRange;
dateString = dateString.toLowerCase();
if (dateString.match(/(this|last)\s+(day|week|month|year|quarter)/)) {
const match = dateString.match(/(this|last)\s+(day|week|month|year|quarter)/);
if (dateString.match(/(this|last)\s+(day|week|month|year|quarter|hour|minute|second)/)) {
const match = dateString.match(/(this|last)\s+(day|week|month|year|quarter|hour|minute|second)/);
let start = moment();
let end = moment();
if (match[1] === 'last') {
Expand All @@ -15,18 +15,18 @@ module.exports = (dateString) => {
}
const span = match[2] === 'week' ? 'isoWeek' : match[2];
momentRange = [start.startOf(span), end.endOf(span)];
} else if (dateString.match(/last\s+(\d+)\s+(day|week|month|year|quarter)/)) {
const match = dateString.match(/last\s+(\d+)\s+(day|week|month|year|quarter)/);
} else if (dateString.match(/last\s+(\d+)\s+(day|week|month|year|quarter|hour|minute|second)/)) {
const match = dateString.match(/last\s+(\d+)\s+(day|week|month|year|quarter|hour|minute|second)/);
const span = match[2] === 'week' ? 'isoWeek' : match[2];
momentRange = [
moment().add(-parseInt(match[1], 10) - 1, match[2]).startOf(span),
moment().add(-1, match[2]).endOf(span)
];
} else if (dateString.match(/today/)) {
momentRange = [moment(), moment()];
momentRange = [moment().startOf('day'), moment().endOf('day')];
} else if (dateString.match(/yesterday/)) {
const yesterday = moment().add(-1, 'day');
momentRange = [yesterday, yesterday];
momentRange = [yesterday.startOf('day'), yesterday.endOf('day')];
} else {
const results = chrono.parse(dateString);
if (!results) {
Expand All @@ -40,5 +40,5 @@ module.exports = (dateString) => {
results[0].start.moment()
];
}
return momentRange.map(d => d.format(moment.HTML5_FMT.DATE));
return momentRange.map(d => d.format(moment.HTML5_FMT.DATETIME_LOCAL_MS));
};
Loading

0 comments on commit 89fc762

Please sign in to comment.