diff --git a/app/lib/methods/subscriptions/room.js b/app/lib/methods/subscriptions/room.js index bada34e3fa..0c17f2ec7d 100644 --- a/app/lib/methods/subscriptions/room.js +++ b/app/lib/methods/subscriptions/room.js @@ -13,7 +13,7 @@ import RocketChat from '../../rocketchat'; let queue = {}; let timer = null; -const WINDOW_TIME = 2000; +const WINDOW_TIME = 500; export default class RoomSubscription { constructor(rid) { @@ -138,15 +138,13 @@ export default class RoomSubscription { RocketChat.readMessages(this.rid, lastOpen); }, 300); - callMessageReceived = async(message, lastOpen) => { - return new Promise(async(resolve, reject) => { - console.log('TCL: RoomSubscription -> callMessageReceived -> message', message); - // if (this.rid !== message.rid) { - // return; - // } + updateMessage = message => ( + new Promise(async(resolve) => { + if (this.rid !== message.rid) { + return; + } const db = database.active; - const batch = []; const msgCollection = db.collections.get('messages'); const threadsCollection = db.collections.get('threads'); const threadMessagesCollection = db.collections.get('thread_messages'); @@ -161,23 +159,11 @@ export default class RoomSubscription { // Do nothing } if (messageRecord) { - try { - const update = messageRecord.prepareUpdate((m) => { - Object.assign(m, message); - }); - this.messagesBatch[message._id] = update; - // batch.push(update); - } catch (e) { - console.log(e); - } + const update = messageRecord.prepareUpdate((m) => { + Object.assign(m, message); + }); + this.messagesBatch[message._id] = update; } else { - // batch.push( - // msgCollection.prepareCreate(protectedFunction((m) => { - // m._raw = sanitizedRaw({ id: message._id }, msgCollection.schema); - // m.subscription.id = this.rid; - // Object.assign(m, message); - // })) - // ); const create = msgCollection.prepareCreate(protectedFunction((m) => { m._raw = sanitizedRaw({ id: message._id }, msgCollection.schema); m.subscription.id = this.rid; @@ -185,7 +171,7 @@ export default class RoomSubscription { })); this.messagesBatch[message._id] = create; } - + // Create or update thread if (message.tlm) { try { @@ -193,25 +179,13 @@ export default class RoomSubscription { } catch (error) { // Do nothing } - + if (threadRecord) { - // batch.push( - // threadRecord.prepareUpdate(protectedFunction((t) => { - // Object.assign(t, message); - // })) - // ); const updateThread = threadRecord.prepareUpdate(protectedFunction((t) => { Object.assign(t, message); })); this.threadsBatch[message._id] = updateThread; } else { - // batch.push( - // threadsCollection.prepareCreate(protectedFunction((t) => { - // t._raw = sanitizedRaw({ id: message._id }, threadsCollection.schema); - // t.subscription.id = this.rid; - // Object.assign(t, message); - // })) - // ); const createThread = threadsCollection.prepareCreate(protectedFunction((t) => { t._raw = sanitizedRaw({ id: message._id }, threadsCollection.schema); t.subscription.id = this.rid; @@ -230,13 +204,6 @@ export default class RoomSubscription { } if (threadMessageRecord) { - // batch.push( - // threadMessageRecord.prepareUpdate(protectedFunction((tm) => { - // Object.assign(tm, message); - // tm.rid = message.tmid; - // delete tm.tmid; - // })) - // ); const updateThreadMessage = threadMessageRecord.prepareUpdate(protectedFunction((tm) => { Object.assign(tm, message); tm.rid = message.tmid; @@ -244,15 +211,6 @@ export default class RoomSubscription { })); this.threadMessagesBatch[message._id] = updateThreadMessage; } else { - // batch.push( - // threadMessagesCollection.prepareCreate(protectedFunction((tm) => { - // tm._raw = sanitizedRaw({ id: message._id }, threadMessagesCollection.schema); - // Object.assign(tm, message); - // tm.subscription.id = this.rid; - // tm.rid = message.tmid; - // delete tm.tmid; - // })) - // ); const createThreadMessage = threadMessagesCollection.prepareCreate(protectedFunction((tm) => { tm._raw = sanitizedRaw({ id: message._id }, threadMessagesCollection.schema); Object.assign(tm, message); @@ -264,53 +222,35 @@ export default class RoomSubscription { } } - // this.read(lastOpen); - return resolve('FOI FOI FOI FOI FOI FOI FOI FOI FOI '); - - // try { - // await db.action(async() => { - // await db.batch(...batch); - // }); - // return resolve('FOI FOI FOI FOI FOI FOI FOI FOI FOI '); - // } catch (e) { - // log(e); - // } - }); - }; + return resolve(); + }) + ) handleMessageReceived = (ddpMessage) => { - console.log('TCL: RoomSubscription -> handleMessageReceived -> ddpMessage', ddpMessage); if (!timer) { timer = setTimeout(async() => { const innerQueue = Object.keys(queue).map(key => queue[key]); - console.log('TCL: RoomSubscription -> timer -> innerQueue', innerQueue); const innerLastOpen = this.lastOpen; queue = {}; timer = null; for (let i = 0; i < innerQueue.length; i += 1) { try { // eslint-disable-next-line no-await-in-loop - const r = await this.callMessageReceived(innerQueue[i], innerLastOpen); - console.log('TCL: RoomSubscription -> timer -> r', r); + await this.updateMessage(innerQueue[i]); } catch (e) { - console.log('TCL: RoomSubscription -> timer -> e', e); + log(e); } } try { - console.log('BEFOOOOOOOOOOOORE') const db = database.active; - console.log(this.messagesBatch) - console.log(this.threadsBatch) - console.log(this.threadMessagesBatch) await db.action(async() => { await db.batch( ...Object.values(this.messagesBatch), ...Object.values(this.threadsBatch), - ...Object.values(this.threadMessagesBatch), + ...Object.values(this.threadMessagesBatch) ); }); - console.log('AFTEEEEEEEEEEEER') this.messagesBatch = {}; this.threadsBatch = {}; this.threadMessagesBatch = {};