diff --git a/app/lib/methods/subscriptions/rooms.js b/app/lib/methods/subscriptions/rooms.js index 91be40fc9e..5cf610a048 100644 --- a/app/lib/methods/subscriptions/rooms.js +++ b/app/lib/methods/subscriptions/rooms.js @@ -9,6 +9,7 @@ import random from '../../../utils/random'; import store from '../../createStore'; import { roomsRequest } from '../../../actions/rooms'; import { notificationReceived } from '../../../actions/notification'; +import buildMessage from '../helpers/buildMessage'; const removeListener = listener => listener.stop(); @@ -16,8 +17,12 @@ let connectedListener; let disconnectedListener; let streamListener; let subServer; +let subQueue = {}; +let subTimer = null; +let roomQueue = {}; +let roomTimer = null; +const WINDOW_TIME = 1000; -// TODO: batch execution const createOrUpdateSubscription = async(subscription, room) => { try { const db = database.active; @@ -128,32 +133,32 @@ const createOrUpdateSubscription = async(subscription, room) => { } } - // if (tmp.lastMessage) { - // const lastMessage = buildMessage(tmp.lastMessage); - // const messagesCollection = db.collections.get('messages'); - // let messageRecord; - // try { - // messageRecord = await messagesCollection.find(lastMessage._id); - // } catch (error) { - // // Do nothing - // } + if (tmp.lastMessage) { + const lastMessage = buildMessage(tmp.lastMessage); + const messagesCollection = db.collections.get('messages'); + let messageRecord; + try { + messageRecord = await messagesCollection.find(lastMessage._id); + } catch (error) { + // Do nothing + } - // if (messageRecord) { - // batch.push( - // messageRecord.prepareUpdate(() => { - // Object.assign(messageRecord, lastMessage); - // }) - // ); - // } else { - // batch.push( - // messagesCollection.prepareCreate((m) => { - // m._raw = sanitizedRaw({ id: lastMessage._id }, messagesCollection.schema); - // m.subscription.id = lastMessage.rid; - // return Object.assign(m, lastMessage); - // }) - // ); - // } - // } + if (messageRecord) { + batch.push( + messageRecord.prepareUpdate(() => { + Object.assign(messageRecord, lastMessage); + }) + ); + } else { + batch.push( + messagesCollection.prepareCreate((m) => { + m._raw = sanitizedRaw({ id: lastMessage._id }, messagesCollection.schema); + m.subscription.id = lastMessage.rid; + return Object.assign(m, lastMessage); + }) + ); + } + } await db.batch(...batch); }); @@ -162,6 +167,34 @@ const createOrUpdateSubscription = async(subscription, room) => { } }; +const debouncedUpdateSub = (subscription) => { + if (!subTimer) { + subTimer = setTimeout(() => { + const subBatch = subQueue; + subQueue = {}; + subTimer = null; + Object.keys(subBatch).forEach((key) => { + createOrUpdateSubscription(subBatch[key]); + }); + }, WINDOW_TIME); + } + subQueue[subscription.rid] = subscription; +}; + +const debouncedUpdateRoom = (room) => { + if (!roomTimer) { + roomTimer = setTimeout(() => { + const roomBatch = roomQueue; + roomQueue = {}; + roomTimer = null; + Object.keys(roomBatch).forEach((key) => { + createOrUpdateSubscription(null, roomBatch[key]); + }); + }, WINDOW_TIME); + } + roomQueue[room._id] = room; +}; + export default function subscribeRooms() { const handleConnection = () => { store.dispatch(roomsRequest()); @@ -202,12 +235,12 @@ export default function subscribeRooms() { log(e); } } else { - await createOrUpdateSubscription(data); + debouncedUpdateSub(data); } } if (/rooms/.test(ev)) { if (type === 'updated' || type === 'inserted') { - await createOrUpdateSubscription(null, data); + debouncedUpdateRoom(data); } } if (/message/.test(ev)) { @@ -257,6 +290,16 @@ export default function subscribeRooms() { streamListener.then(removeListener); streamListener = false; } + subQueue = {}; + roomQueue = {}; + if (subTimer) { + clearTimeout(subTimer); + subTimer = false; + } + if (roomTimer) { + clearTimeout(roomTimer); + roomTimer = false; + } }; connectedListener = this.sdk.onStreamData('connected', handleConnection); diff --git a/app/sagas/rooms.js b/app/sagas/rooms.js index 7476724bd4..641a75e53e 100644 --- a/app/sagas/rooms.js +++ b/app/sagas/rooms.js @@ -11,6 +11,8 @@ import database from '../lib/database'; import log from '../utils/log'; import mergeSubscriptionsRooms from '../lib/methods/helpers/mergeSubscriptionsRooms'; import RocketChat from '../lib/rocketchat'; +import buildMessage from '../lib/methods/helpers/buildMessage'; +import protectedFunction from '../lib/methods/helpers/protectedFunction'; const updateRooms = function* updateRooms({ server, newRoomsUpdatedAt }) { const serversDB = database.servers; @@ -38,6 +40,7 @@ const handleRoomsRequest = function* handleRoomsRequest() { const db = database.active; const subCollection = db.collections.get('subscriptions'); + const messagesCollection = db.collections.get('messages'); if (subscriptions.length) { const subsIds = subscriptions.map(sub => sub.rid); @@ -46,6 +49,14 @@ const handleRoomsRequest = function* handleRoomsRequest() { const subsToCreate = subscriptions.filter(i1 => !existingSubs.find(i2 => i1._id === i2._id)); // TODO: subsToDelete? + const lastMessages = subscriptions + .map(sub => sub.lastMessage && buildMessage(sub.lastMessage)) + .filter(lm => lm); + const lastMessagesIds = lastMessages.map(lm => lm._id); + const existingMessages = yield messagesCollection.query(Q.where('id', Q.oneOf(lastMessagesIds))).fetch(); + const messagesToUpdate = existingMessages.filter(i1 => lastMessages.find(i2 => i1.id === i2._id)); + const messagesToCreate = lastMessages.filter(i1 => !existingMessages.find(i2 => i1._id === i2.id)); + const allRecords = [ ...subsToCreate.map(subscription => subCollection.prepareCreate((s) => { s._raw = sanitizedRaw({ id: subscription.rid }, subCollection.schema); @@ -56,6 +67,17 @@ const handleRoomsRequest = function* handleRoomsRequest() { return subscription.prepareUpdate(() => { Object.assign(subscription, newSub); }); + }), + ...messagesToCreate.map(message => messagesCollection.prepareCreate(protectedFunction((m) => { + m._raw = sanitizedRaw({ id: message._id }, messagesCollection.schema); + m.subscription.id = message.rid; + return Object.assign(m, message); + }))), + ...messagesToUpdate.map((message) => { + const newMessage = lastMessages.find(m => m._id === message.id); + return message.prepareUpdate(protectedFunction(() => { + Object.assign(message, newMessage); + })); }) ];