Skip to content

Commit

Permalink
Refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
diegolmello committed Feb 19, 2020
1 parent b21f012 commit 2fbc90e
Showing 1 changed file with 18 additions and 78 deletions.
96 changes: 18 additions & 78 deletions app/lib/methods/subscriptions/room.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import RocketChat from '../../rocketchat';

let queue = {};
let timer = null;
const WINDOW_TIME = 2000;
const WINDOW_TIME = 500;

export default class RoomSubscription {
constructor(rid) {
Expand Down Expand Up @@ -138,15 +138,13 @@ export default class RoomSubscription {
RocketChat.readMessages(this.rid, lastOpen);
}, 300);

callMessageReceived = async(message, lastOpen) => {
return new Promise(async(resolve, reject) => {
console.log('TCL: RoomSubscription -> callMessageReceived -> message', message);
// if (this.rid !== message.rid) {
// return;
// }
updateMessage = message => (
new Promise(async(resolve) => {
if (this.rid !== message.rid) {
return;
}

const db = database.active;
const batch = [];
const msgCollection = db.collections.get('messages');
const threadsCollection = db.collections.get('threads');
const threadMessagesCollection = db.collections.get('thread_messages');
Expand All @@ -161,57 +159,33 @@ export default class RoomSubscription {
// Do nothing
}
if (messageRecord) {
try {
const update = messageRecord.prepareUpdate((m) => {
Object.assign(m, message);
});
this.messagesBatch[message._id] = update;
// batch.push(update);
} catch (e) {
console.log(e);
}
const update = messageRecord.prepareUpdate((m) => {
Object.assign(m, message);
});
this.messagesBatch[message._id] = update;
} else {
// batch.push(
// msgCollection.prepareCreate(protectedFunction((m) => {
// m._raw = sanitizedRaw({ id: message._id }, msgCollection.schema);
// m.subscription.id = this.rid;
// Object.assign(m, message);
// }))
// );
const create = msgCollection.prepareCreate(protectedFunction((m) => {
m._raw = sanitizedRaw({ id: message._id }, msgCollection.schema);
m.subscription.id = this.rid;
Object.assign(m, message);
}));
this.messagesBatch[message._id] = create;
}

// Create or update thread
if (message.tlm) {
try {
threadRecord = await threadsCollection.find(message._id);
} catch (error) {
// Do nothing
}

if (threadRecord) {
// batch.push(
// threadRecord.prepareUpdate(protectedFunction((t) => {
// Object.assign(t, message);
// }))
// );
const updateThread = threadRecord.prepareUpdate(protectedFunction((t) => {
Object.assign(t, message);
}));
this.threadsBatch[message._id] = updateThread;
} else {
// batch.push(
// threadsCollection.prepareCreate(protectedFunction((t) => {
// t._raw = sanitizedRaw({ id: message._id }, threadsCollection.schema);
// t.subscription.id = this.rid;
// Object.assign(t, message);
// }))
// );
const createThread = threadsCollection.prepareCreate(protectedFunction((t) => {
t._raw = sanitizedRaw({ id: message._id }, threadsCollection.schema);
t.subscription.id = this.rid;
Expand All @@ -230,29 +204,13 @@ export default class RoomSubscription {
}

if (threadMessageRecord) {
// batch.push(
// threadMessageRecord.prepareUpdate(protectedFunction((tm) => {
// Object.assign(tm, message);
// tm.rid = message.tmid;
// delete tm.tmid;
// }))
// );
const updateThreadMessage = threadMessageRecord.prepareUpdate(protectedFunction((tm) => {
Object.assign(tm, message);
tm.rid = message.tmid;
delete tm.tmid;
}));
this.threadMessagesBatch[message._id] = updateThreadMessage;
} else {
// batch.push(
// threadMessagesCollection.prepareCreate(protectedFunction((tm) => {
// tm._raw = sanitizedRaw({ id: message._id }, threadMessagesCollection.schema);
// Object.assign(tm, message);
// tm.subscription.id = this.rid;
// tm.rid = message.tmid;
// delete tm.tmid;
// }))
// );
const createThreadMessage = threadMessagesCollection.prepareCreate(protectedFunction((tm) => {
tm._raw = sanitizedRaw({ id: message._id }, threadMessagesCollection.schema);
Object.assign(tm, message);
Expand All @@ -264,53 +222,35 @@ export default class RoomSubscription {
}
}

// this.read(lastOpen);
return resolve('FOI FOI FOI FOI FOI FOI FOI FOI FOI ');

// try {
// await db.action(async() => {
// await db.batch(...batch);
// });
// return resolve('FOI FOI FOI FOI FOI FOI FOI FOI FOI ');
// } catch (e) {
// log(e);
// }
});
};
return resolve();
})
)

handleMessageReceived = (ddpMessage) => {
console.log('TCL: RoomSubscription -> handleMessageReceived -> ddpMessage', ddpMessage);
if (!timer) {
timer = setTimeout(async() => {
const innerQueue = Object.keys(queue).map(key => queue[key]);
console.log('TCL: RoomSubscription -> timer -> innerQueue', innerQueue);
const innerLastOpen = this.lastOpen;
queue = {};
timer = null;
for (let i = 0; i < innerQueue.length; i += 1) {
try {
// eslint-disable-next-line no-await-in-loop
const r = await this.callMessageReceived(innerQueue[i], innerLastOpen);
console.log('TCL: RoomSubscription -> timer -> r', r);
await this.updateMessage(innerQueue[i]);
} catch (e) {
console.log('TCL: RoomSubscription -> timer -> e', e);
log(e);
}
}

try {
console.log('BEFOOOOOOOOOOOORE')
const db = database.active;
console.log(this.messagesBatch)
console.log(this.threadsBatch)
console.log(this.threadMessagesBatch)
await db.action(async() => {
await db.batch(
...Object.values(this.messagesBatch),
...Object.values(this.threadsBatch),
...Object.values(this.threadMessagesBatch),
...Object.values(this.threadMessagesBatch)
);
});
console.log('AFTEEEEEEEEEEEER')
this.messagesBatch = {};
this.threadsBatch = {};
this.threadMessagesBatch = {};
Expand Down

0 comments on commit 2fbc90e

Please sign in to comment.