Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Regression: Replace the Omnichannel queue model observe with Stream #16999

Merged
merged 10 commits into from
Mar 25, 2020
4 changes: 2 additions & 2 deletions app/livechat/client/lib/stream/inquiry.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Meteor } from 'meteor/meteor';

import { LIVECHAT_INQUIRY_DATA_STREAM_OBSERVER } from '../../../lib/stream/constants';
import { LIVECHAT_INQUIRY_QUEUE_STREAM_OBSERVER } from '../../../lib/stream/constants';

export const inquiryDataStream = new Meteor.Streamer(LIVECHAT_INQUIRY_DATA_STREAM_OBSERVER);
export const inquiryDataStream = new Meteor.Streamer(LIVECHAT_INQUIRY_QUEUE_STREAM_OBSERVER);
27 changes: 11 additions & 16 deletions app/livechat/client/lib/stream/queueManager.js
Original file line number Diff line number Diff line change
@@ -1,39 +1,36 @@
import { Meteor } from 'meteor/meteor';

import { APIClient } from '../../../../utils/client';
import { getLivechatInquiryCollection } from '../../collections/LivechatInquiry';
import { LIVECHAT_INQUIRY_QUEUE_STREAM_OBSERVER } from '../../../lib/stream/constants';
import { inquiryDataStream } from './inquiry';
import { hasRole } from '../../../../authorization/client';

const livechatQueueStreamer = new Meteor.Streamer('livechat-queue-stream');
let agentDepartments = [];
const collection = getLivechatInquiryCollection();

const events = {
added: (inquiry, collection) => {
added: (inquiry) => {
delete inquiry.type;
collection.insert(inquiry);
},
changed: (inquiry, collection) => {
changed: (inquiry) => {
if (inquiry.status !== 'queued' || (inquiry.department && !agentDepartments.includes(inquiry.department))) {
return collection.remove(inquiry._id);
}
delete inquiry.type;
collection.upsert({ _id: inquiry._id }, inquiry);
},
removed: (inquiry, collection) => collection.remove(inquiry._id),
removed: (inquiry) => collection.remove(inquiry._id),
};

const appendListenerToDepartment = (departmentId, collection) => livechatQueueStreamer.on(`${ LIVECHAT_INQUIRY_QUEUE_STREAM_OBSERVER }/${ departmentId }`, (inquiry) => events[inquiry.type](inquiry, collection));

const removeListenerOfDepartment = (departmentId) => livechatQueueStreamer.removeListener(`${ LIVECHAT_INQUIRY_QUEUE_STREAM_OBSERVER }/${ departmentId }`);
const updateCollection = (inquiry) => { events[inquiry.type](inquiry); };
const appendListenerToDepartment = (departmentId) => inquiryDataStream.on(`department/${ departmentId }`, updateCollection);
const removeListenerOfDepartment = (departmentId) => inquiryDataStream.removeListener(`department/${ departmentId }`, updateCollection);

const getInquiriesFromAPI = async (url) => {
const { inquiries } = await APIClient.v1.get(url);
return inquiries;
};

const updateInquiries = async (inquiries) => {
const collection = getLivechatInquiryCollection();
(inquiries || []).forEach((inquiry) => collection.upsert({ _id: inquiry._id }, inquiry));
};

Expand All @@ -43,9 +40,8 @@ const getAgentsDepartments = async (userId) => {
};

const addListenerForeachDepartment = async (userId, departments) => {
const collection = getLivechatInquiryCollection();
if (departments && Array.isArray(departments) && departments.length) {
departments.forEach((department) => appendListenerToDepartment(department, collection));
departments.forEach((department) => appendListenerToDepartment(department));
}
};

Expand All @@ -54,11 +50,10 @@ const removeDepartmentsListeners = (departments) => {
};

const removeGlobalListener = () => {
livechatQueueStreamer.removeListener(LIVECHAT_INQUIRY_QUEUE_STREAM_OBSERVER);
inquiryDataStream.removeListener('public', updateCollection);
};

export const initializeLivechatInquiryStream = async (userId) => {
const collection = getLivechatInquiryCollection();
collection.remove({});
if (agentDepartments.length) {
removeDepartmentsListeners(agentDepartments);
Expand All @@ -68,6 +63,6 @@ export const initializeLivechatInquiryStream = async (userId) => {
agentDepartments = (await getAgentsDepartments(userId)).map((department) => department.departmentId);
await addListenerForeachDepartment(userId, agentDepartments);
if (agentDepartments.length === 0 || hasRole(userId, 'livechat-manager')) {
livechatQueueStreamer.on(LIVECHAT_INQUIRY_QUEUE_STREAM_OBSERVER, (inquiry) => events[inquiry.type](inquiry, collection));
inquiryDataStream.on('public', updateCollection);
}
};
9 changes: 5 additions & 4 deletions app/livechat/client/views/app/livechatReadOnly.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,12 @@ Template.livechatReadOnly.onCreated(function() {
this.routingConfig = new ReactiveVar({});
this.preparing = new ReactiveVar(true);

this.updateInquiry = async (inquiry) => {
this.inquiry.set(inquiry);
if (!await call('canAccessRoom', inquiry.rid, Meteor.userId())) {
FlowRouter.go('/home');
this.updateInquiry = async ({ clientAction, ...inquiry }) => {
if (clientAction === 'removed' || !await call('canAccessRoom', inquiry.rid, Meteor.userId())) {
return FlowRouter.go('/home');
}

this.inquiry.set(inquiry);
};

Meteor.call('livechat:getRoutingConfig', (err, config) => {
Expand Down
1 change: 0 additions & 1 deletion app/livechat/lib/stream/constants.js
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
export const LIVECHAT_INQUIRY_QUEUE_STREAM_OBSERVER = 'livechat-inquiry-queue-observer';
export const LIVECHAT_INQUIRY_DATA_STREAM_OBSERVER = 'livechat-inquiry-data-observer';
1 change: 0 additions & 1 deletion app/livechat/server/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ import './lib/routing/External';
import './lib/routing/ManualSelection';
import './lib/routing/AutoSelection';
import './lib/stream/departmentAgents';
import './lib/stream/inquiry';
import './lib/stream/queueManager';
import './sendMessageBySMS';
import './unclosedLivechats';
Expand Down
34 changes: 0 additions & 34 deletions app/livechat/server/lib/stream/inquiry.js

This file was deleted.

64 changes: 35 additions & 29 deletions app/livechat/server/lib/stream/queueManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,38 +4,44 @@ import { hasPermission } from '../../../../authorization/server';
import { LivechatInquiry } from '../../../../models/server';
import { LIVECHAT_INQUIRY_QUEUE_STREAM_OBSERVER } from '../../../lib/stream/constants';

const livechatQueueStreamer = new Meteor.Streamer('livechat-queue-stream');
livechatQueueStreamer.allowWrite('none');
livechatQueueStreamer.allowRead(function() {
const queueDataStreamer = new Meteor.Streamer(LIVECHAT_INQUIRY_QUEUE_STREAM_OBSERVER);
queueDataStreamer.allowWrite('none');
queueDataStreamer.allowRead(function() {
return this.userId ? hasPermission(this.userId, 'view-l-room') : false;
});

const emitEvent = (event, data) => livechatQueueStreamer.emit(event, data);
const emitQueueDataEvent = (event, data) => queueDataStreamer.emit(event, data);
const mountDataToEmit = (type, data) => ({ type, ...data });

LivechatInquiry.find({}).observeChanges({
added(_id, record) {
if (record && record.department) {
return emitEvent(`${ LIVECHAT_INQUIRY_QUEUE_STREAM_OBSERVER }/${ record.department }`, mountDataToEmit('added', { ...record, _id }));
}
emitEvent(LIVECHAT_INQUIRY_QUEUE_STREAM_OBSERVER, mountDataToEmit('added', { ...record, _id }));
},
changed(_id, record) {
const isUpdatingDepartment = record && record.department;
const inquiry = LivechatInquiry.findOneById(_id);
if (inquiry && !inquiry.department) {
return emitEvent(LIVECHAT_INQUIRY_QUEUE_STREAM_OBSERVER, mountDataToEmit('changed', inquiry));
}
if (isUpdatingDepartment) {
emitEvent(LIVECHAT_INQUIRY_QUEUE_STREAM_OBSERVER, mountDataToEmit('changed', inquiry));
}
return emitEvent(`${ LIVECHAT_INQUIRY_QUEUE_STREAM_OBSERVER }/${ inquiry.department }`, mountDataToEmit('changed', inquiry));
},
removed(_id) {
const inquiry = LivechatInquiry.trashFindOneById(_id);
if (inquiry && inquiry.department) {
return emitEvent(`${ LIVECHAT_INQUIRY_QUEUE_STREAM_OBSERVER }/${ inquiry.department }`, mountDataToEmit('removed', { _id }));
}
emitEvent(LIVECHAT_INQUIRY_QUEUE_STREAM_OBSERVER, mountDataToEmit('removed', { _id }));
},
LivechatInquiry.on('change', ({ clientAction, id: _id, data: record }) => {
switch (clientAction) {
case 'inserted':
emitQueueDataEvent(_id, { ...record, clientAction });
if (record && record.department) {
return emitQueueDataEvent(`department/${ record.department }`, mountDataToEmit('added', record));
}
emitQueueDataEvent('public', mountDataToEmit('added', record));
break;
case 'updated':
const isUpdatingDepartment = record && record.department;
const updatedRecord = LivechatInquiry.findOneById(_id);
emitQueueDataEvent(_id, { ...updatedRecord, clientAction });
if (updatedRecord && !updatedRecord.department) {
return emitQueueDataEvent('public', mountDataToEmit('changed', updatedRecord));
}
if (isUpdatingDepartment) {
emitQueueDataEvent('public', mountDataToEmit('changed', updatedRecord));
}
emitQueueDataEvent(`department/${ updatedRecord.department }`, mountDataToEmit('changed', updatedRecord));
break;

case 'removed':
const removedRecord = LivechatInquiry.trashFindOneById(_id);
emitQueueDataEvent(_id, { _id, clientAction });
if (removedRecord && removedRecord.department) {
return emitQueueDataEvent(`department/${ removedRecord.department }`, mountDataToEmit('removed', { _id }));
}
emitQueueDataEvent('public', mountDataToEmit('removed', { _id }));
break;
}
});