Skip to content

Commit

Permalink
[lib] Take operations from the queue when conditions become met
Browse files Browse the repository at this point in the history
Summary:
Check the conditions, take from the queues, process and dispatch actions.

https://linear.app/comm/issue/ENG-9189/introduce-additional-operation-queues

Depends on D13366

Test Plan:
Tested the whole stack at once:
1. Created an operation with a reaction that was received before an operation creating the message
2. Created an operation with an edit that was received before an operation creating the message
3. Created an operation with an entry edit that was received before an operation creating the entry
4. Created an operation with thread subscription change that was received before the user became a member
In each of the cases verified that the result was correct and that the operation was removed from the queue.

Reviewers: kamil, will

Reviewed By: kamil

Subscribers: ashoat

Differential Revision: https://phab.comm.dev/D13375
  • Loading branch information
palys-swm committed Sep 19, 2024
1 parent 25b472e commit ebe8eb9
Showing 1 changed file with 155 additions and 14 deletions.
169 changes: 155 additions & 14 deletions lib/shared/dm-ops/dm-ops-queue-handler.react.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,20 @@ import * as React from 'react';

import { dmOperationSpecificationTypes } from './dm-op-utils.js';
import { useProcessDMOperation } from './process-dm-ops.js';
import { threadInfoSelector } from '../../selectors/thread-selectors.js';
import { messageInfoSelector } from '../../selectors/chat-selectors.js';
import {
entryInfoSelector,
threadInfoSelector,
} from '../../selectors/thread-selectors.js';
import {
clearQueuedEntryDMOpsActionType,
clearQueuedMembershipDMOpsActionType,
clearQueuedMessageDMOpsActionType,
clearQueuedThreadDMOpsActionType,
pruneDMOpsQueueActionType,
} from '../../types/dm-ops.js';
import type { OperationsQueue } from '../../types/dm-ops.js';
import { threadTypeIsThick } from '../../types/thread-types-enum.js';
import { useDispatch, useSelector } from '../../utils/redux-utils.js';

const PRUNING_FREQUENCY = 60 * 60 * 1000;
Expand Down Expand Up @@ -40,26 +49,24 @@ function DMOpsQueueHandler(): React.Node {

const threadInfos = useSelector(threadInfoSelector);
const threadIDs = React.useMemo(
() => new Set(Object.keys(threadInfos)),
() =>
new Set(
Object.entries(threadInfos)
.filter(([, threadInfo]) => threadTypeIsThick(threadInfo.type))
.map(([id]) => id),
),
[threadInfos],
);
const prevThreadIDsRef = React.useRef<$ReadOnlySet<string>>(new Set());

const queuedOperations = useSelector(
const queuedThreadOperations = useSelector(
state => state.queuedDMOperations.threadQueue,
);

const processDMOperation = useProcessDMOperation();

React.useEffect(() => {
const prevThreadIDs = prevThreadIDsRef.current;
prevThreadIDsRef.current = threadIDs;

for (const threadID in queuedOperations) {
if (!threadIDs.has(threadID) || prevThreadIDs.has(threadID)) {
continue;
}
for (const dmOp of queuedOperations[threadID]) {
const processOperationsQueue = React.useCallback(
(queue: OperationsQueue) => {
for (const dmOp of queue) {
void processDMOperation({
// This is `INBOUND` because we assume that when generating
// `dmOperationSpecificationTypes.OUBOUND` it should be possible
Expand All @@ -71,15 +78,149 @@ function DMOpsQueueHandler(): React.Node {
metadata: null,
});
}
},
[processDMOperation],
);

React.useEffect(() => {
const prevThreadIDs = prevThreadIDsRef.current;
prevThreadIDsRef.current = threadIDs;

for (const threadID in queuedThreadOperations) {
if (!threadIDs.has(threadID) || prevThreadIDs.has(threadID)) {
continue;
}
processOperationsQueue(queuedThreadOperations[threadID]);
dispatch({
type: clearQueuedThreadDMOpsActionType,
payload: {
threadID,
},
});
}
}, [dispatch, processDMOperation, queuedOperations, threadIDs]);
}, [dispatch, processOperationsQueue, queuedThreadOperations, threadIDs]);

const messageInfos = useSelector(messageInfoSelector);
const messageIDs = React.useMemo(
() =>
new Set(
Object.entries(messageInfos)
.filter(
([, messageInfo]) =>
messageInfo &&
threadTypeIsThick(threadInfos[messageInfo.threadID]?.type),
)
.map(([id]) => id),
),
[messageInfos, threadInfos],
);
const prevMessageIDsRef = React.useRef<$ReadOnlySet<string>>(new Set());

const queuedMessageOperations = useSelector(
state => state.queuedDMOperations.messageQueue,
);

React.useEffect(() => {
const prevMessageIDs = prevMessageIDsRef.current;
prevMessageIDsRef.current = messageIDs;

for (const messageID in queuedMessageOperations) {
if (!messageIDs.has(messageID) || prevMessageIDs.has(messageID)) {
continue;
}
processOperationsQueue(queuedMessageOperations[messageID]);
dispatch({
type: clearQueuedMessageDMOpsActionType,
payload: {
messageID,
},
});
}
}, [dispatch, messageIDs, processOperationsQueue, queuedMessageOperations]);

const entryInfos = useSelector(entryInfoSelector);
const entryIDs = React.useMemo(
() =>
new Set(
Object.entries(entryInfos)
.filter(([, entryInfo]) =>
threadTypeIsThick(threadInfos[entryInfo.threadID]?.type),
)
.map(([id]) => id),
),
[entryInfos, threadInfos],
);
const prevEntryIDsRef = React.useRef<$ReadOnlySet<string>>(new Set());

const queuedEntryOperations = useSelector(
state => state.queuedDMOperations.entryQueue,
);

React.useEffect(() => {
const prevEntryIDs = prevEntryIDsRef.current;
prevEntryIDsRef.current = entryIDs;

for (const entryID in queuedEntryOperations) {
if (!entryIDs.has(entryID) || prevEntryIDs.has(entryID)) {
continue;
}
processOperationsQueue(queuedEntryOperations[entryID]);
dispatch({
type: clearQueuedEntryDMOpsActionType,
payload: {
entryID,
},
});
}
}, [dispatch, entryIDs, processOperationsQueue, queuedEntryOperations]);

const queuedMembershipOperations = useSelector(
state => state.queuedDMOperations.membershipQueue,
);

const runningMembershipOperations = React.useRef<Map<string, Set<string>>>(
new Map(),
);
React.useEffect(() => {
for (const threadID in queuedMembershipOperations) {
if (!threadInfos[threadID]) {
continue;
}

const queuedMemberIDs = new Set(
Object.keys(queuedMembershipOperations[threadID]),
);
if (!runningMembershipOperations.current.has(threadID)) {
runningMembershipOperations.current.set(threadID, new Set());
}
for (const member of threadInfos[threadID].members) {
if (
!queuedMemberIDs.has(member.id) ||
runningMembershipOperations.current.get(threadID)?.has(member.id)
) {
continue;
}

runningMembershipOperations.current.get(threadID)?.add(member.id);

processOperationsQueue(queuedMembershipOperations[threadID][member.id]);

dispatch({
type: clearQueuedMembershipDMOpsActionType,
payload: {
threadID,
userID: member.id,
},
});
runningMembershipOperations.current.get(threadID)?.delete(member.id);
}
}
}, [
dispatch,
processOperationsQueue,
queuedMembershipOperations,
threadInfos,
]);

return null;
}
Expand Down

0 comments on commit ebe8eb9

Please sign in to comment.