Skip to content

Commit

Permalink
refactor: lazy lookup table
Browse files Browse the repository at this point in the history
  • Loading branch information
arnautov-anton committed Aug 29, 2024
1 parent 656860a commit 4567993
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 35 deletions.
63 changes: 33 additions & 30 deletions src/thread_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ export type ThreadManagerState<SCG extends ExtendableGenerics = DefaultGenerics>
pagination: ThreadManagerPagination;
ready: boolean;
threads: Thread<SCG>[];
threadsById: Record<string, Thread<SCG> | undefined>;
unreadThreadsCount: number;
/**
* List of threads that haven't been loaded in the list, but have received new messages
Expand All @@ -33,14 +32,17 @@ export class ThreadManager<SCG extends ExtendableGenerics = DefaultGenerics> {
public readonly state: StateStore<ThreadManagerState<SCG>>;
private client: StreamChat<SCG>;
private unsubscribeFunctions: Set<() => void> = new Set();
private threadsByIdGetterCache: {
threads: ThreadManagerState<SCG>['threads'];
threadsById: { [key: string]: Thread<SCG> };
};

constructor({ client }: { client: StreamChat<SCG> }) {
this.client = client;
this.state = new StateStore<ThreadManagerState<SCG>>({
active: false,
isThreadOrderStale: false,
threads: [],
threadsById: {},
unreadThreadsCount: 0,
unseenThreadIds: [],
lastConnectionDropAt: null,
Expand All @@ -51,6 +53,26 @@ export class ThreadManager<SCG extends ExtendableGenerics = DefaultGenerics> {
},
ready: false,
});

this.threadsByIdGetterCache = { threads: [], threadsById: {} };
}

public get threadsById() {
const { threads } = this.state.getLatestValue();

if (threads === this.threadsByIdGetterCache.threads) {
return this.threadsByIdGetterCache.threadsById;
}

const threadsById = threads.reduce<Record<string, Thread<SCG>>>((newThreadsById, thread) => {
newThreadsById[thread.id] = thread;
return newThreadsById;
}, {});

this.threadsByIdGetterCache.threads = threads;
this.threadsByIdGetterCache.threadsById = threadsById;

return threadsById;
}

public activate = () => {
Expand Down Expand Up @@ -92,12 +114,12 @@ export class ThreadManager<SCG extends ExtendableGenerics = DefaultGenerics> {

private subscribeManageThreadSubscriptions = () =>
this.state.subscribeWithSelector(
(nextValue) => [nextValue.threads, nextValue.threadsById] as const,
([nextThreads, nextThreadsById], prev) => {
(nextValue) => [nextValue.threads] as const,
([nextThreads], prev) => {
const [prevThreads = []] = prev ?? [];
// Thread instance was removed if there's no thread with the given id at all,
// or it was replaced with a new instance
const removedThreads = prevThreads.filter((thread) => thread !== nextThreadsById[thread.id]);
const removedThreads = prevThreads.filter((thread) => thread !== this.threadsById[thread.id]);

nextThreads.forEach((thread) => thread.registerSubscriptions());
removedThreads.forEach((thread) => thread.unregisterSubscriptions());
Expand All @@ -117,10 +139,10 @@ export class ThreadManager<SCG extends ExtendableGenerics = DefaultGenerics> {
const parentId = event.message?.parent_id;
if (!parentId) return;

const { threadsById, unseenThreadIds, ready } = this.state.getLatestValue();
const { unseenThreadIds, ready } = this.state.getLatestValue();
if (!ready) return;

if (threadsById[parentId]) {
if (this.threadsById[parentId]) {
this.state.partialNext({ isThreadOrderStale: true });
} else if (!unseenThreadIds.includes(parentId)) {
this.state.partialNext({ unseenThreadIds: unseenThreadIds.concat(parentId) });
Expand Down Expand Up @@ -182,7 +204,8 @@ export class ThreadManager<SCG extends ExtendableGenerics = DefaultGenerics> {
}));

const response = await this.queryThreads({ limit: Math.min(limit, MAX_QUERY_THREADS_LIMIT) });
const { threadsById: currentThreads } = this.state.getLatestValue();

const currentThreads = this.threadsById;
const nextThreads: Thread<SCG>[] = [];

for (const incomingThread of response.threads) {
Expand All @@ -201,7 +224,7 @@ export class ThreadManager<SCG extends ExtendableGenerics = DefaultGenerics> {

this.state.next((current) => ({
...current,
...prepareThreadsUpdate(current, nextThreads),
threads: nextThreads,
unseenThreadIds: [],
isThreadOrderStale: false,
pagination: {
Expand Down Expand Up @@ -248,10 +271,7 @@ export class ThreadManager<SCG extends ExtendableGenerics = DefaultGenerics> {

this.state.next((current) => ({
...current,
...prepareThreadsUpdate(
current,
response.threads.length ? current.threads.concat(response.threads) : current.threads,
),
threads: response.threads.length ? current.threads.concat(response.threads) : current.threads,
pagination: {
...current.pagination,
nextCursor: response.next ?? null,
Expand All @@ -270,20 +290,3 @@ export class ThreadManager<SCG extends ExtendableGenerics = DefaultGenerics> {
}
};
}

function prepareThreadsUpdate<SCG extends ExtendableGenerics = DefaultGenerics>(
state: ThreadManagerState<SCG>,
threads: Thread<SCG>[],
): Partial<ThreadManagerState<SCG>> {
if (threads === state.threads) {
return {};
}

return {
threads,
threadsById: threads.reduce<Record<string, Thread<SCG>>>((newThreadsById, thread) => {
newThreadsById[thread.id] = thread;
return newThreadsById;
}, {}),
};
}
29 changes: 24 additions & 5 deletions test/unit/threads.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -896,7 +896,6 @@ describe('Threads 2.0', () => {
const thread = createTestThread();
threadManager.state.partialNext({
threads: [thread],
threadsById: { [thread.id]: thread },
ready: true,
});

Expand Down Expand Up @@ -962,15 +961,13 @@ describe('Threads 2.0', () => {

threadManager.state.partialNext({
threads: [thread1, thread2],
threadsById: { [thread1.id]: thread1, [thread2.id]: thread2 },
});

expect(registerThread1.calledOnce).to.be.true;
expect(registerThread2.calledOnce).to.be.true;

threadManager.state.partialNext({
threads: [thread2, thread3],
threadsById: { [thread2.id]: thread2, [thread3.id]: thread3 },
});

expect(unregisterThread1.calledOnce).to.be.true;
Expand All @@ -984,7 +981,7 @@ describe('Threads 2.0', () => {
});
});

describe('Methods', () => {
describe('Methods & Getters', () => {
let stubbedQueryThreads: sinon.SinonStub<
Parameters<StreamChat['queryThreads']>,
ReturnType<StreamChat['queryThreads']>
Expand All @@ -997,6 +994,29 @@ describe('Threads 2.0', () => {
});
});

describe('threadsById', () => {
it('lazily generates & re-generates a proper lookup table', () => {
const thread1 = createTestThread({ parentMessageOverrides: { id: uuidv4() } });
const thread2 = createTestThread({ parentMessageOverrides: { id: uuidv4() } });
const thread3 = createTestThread({ parentMessageOverrides: { id: uuidv4() } });

threadManager.state.partialNext({ threads: [thread1, thread2] });
const state1 = threadManager.state.getLatestValue();

expect(state1.threads).to.have.lengthOf(2);
expect(Object.keys(threadManager.threadsById)).to.have.lengthOf(2);
expect(threadManager.threadsById).to.have.keys(thread1.id, thread2.id);

threadManager.state.partialNext({ threads: [thread3] });
const state2 = threadManager.state.getLatestValue();

expect(state2.threads).to.have.lengthOf(1);
expect(Object.keys(threadManager.threadsById)).to.have.lengthOf(1);
expect(threadManager.threadsById).to.have.keys(thread3.id);
expect(threadManager.threadsById[thread3.id]).to.equal(thread3);
});
});

describe('reload', () => {
it('skips reload if there were no updates since the latest reload', async () => {
threadManager.state.partialNext({ ready: true });
Expand Down Expand Up @@ -1074,7 +1094,6 @@ describe('Threads 2.0', () => {
});
threadManager.state.partialNext({
threads: [existingThread],
threadsById: { [existingThread.id]: existingThread },
unseenThreadIds: [newThread.id],
});
stubbedQueryThreads.resolves({
Expand Down

0 comments on commit 4567993

Please sign in to comment.