Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: threads 2.0 #1330

Merged
merged 42 commits into from
Sep 2, 2024
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
7192054
Initial commit
arnautov-anton May 15, 2024
6c51d69
First MVP
arnautov-anton May 24, 2024
23bf70a
Adjustments to state handlers
arnautov-anton May 28, 2024
c2350c2
Add loadUnreadThreads and adjust logic
arnautov-anton Jun 10, 2024
501b8c1
New handlers and extending storage options
arnautov-anton Jun 15, 2024
f5b272f
New handlers and fixes to existing ones
arnautov-anton Jun 20, 2024
2c26384
Upgrade TS, adjust conf and code, add lodash/throttle
arnautov-anton Jun 20, 2024
ab49e93
New handlers and fixes
arnautov-anton Jun 25, 2024
ab16fda
New fixes, handlers and adjustments
arnautov-anton Jul 12, 2024
ba57e0a
New fixes, handlers and adjustments
arnautov-anton Jul 16, 2024
462eca3
Added some ThreadManager tests
arnautov-anton Jul 24, 2024
7b20568
Finalize ThreadManager tests
arnautov-anton Jul 24, 2024
f8a9ea3
Added new tests and fixes
arnautov-anton Jul 25, 2024
69fdaef
Drop lodash.throttle, use own implementation
arnautov-anton Jul 26, 2024
ff29795
Adjust tests and package.json
arnautov-anton Jul 31, 2024
fbc6361
Rename next/previous parameters to "cursors", adjust tests
arnautov-anton Aug 2, 2024
d547870
Upgrade Mocha
arnautov-anton Aug 5, 2024
7590add
Replace unsupported Array.toSpliced
arnautov-anton Aug 5, 2024
0e5d673
Adjust insertion index API
arnautov-anton Aug 6, 2024
bfd5ed5
Remove subscription registration from constructor
arnautov-anton Aug 13, 2024
6c50a9e
chore: thread and thread manager into separate file (#1336)
myandrienko Aug 14, 2024
a08ffb1
feat: do not store channelData in thread state (#1337)
myandrienko Aug 14, 2024
2916a21
fix: updates for state store
myandrienko Aug 21, 2024
20974a8
Adjust ThreadResponse type, remove unused properties, rename generic
arnautov-anton Aug 21, 2024
3d88ef7
fix: typo
myandrienko Aug 21, 2024
928e6a7
One test down
arnautov-anton Aug 21, 2024
e71b7bc
Register TM subscriptions before running tests
arnautov-anton Aug 21, 2024
2ac8fea
WIP
myandrienko Aug 21, 2024
10daa99
Return existingReorderedThreadIds functionality
arnautov-anton Aug 22, 2024
7994292
Remove unnecessary return type
arnautov-anton Aug 23, 2024
3358e12
fix: fix most TODOs and tests for Threads class (#1347)
myandrienko Aug 28, 2024
ffbe784
fix: fix most TODOs and tests for ThreadManager class (#1348)
myandrienko Aug 29, 2024
b63da35
Merge branch 'master' into feat/threads-v2
myandrienko Aug 29, 2024
ac3730f
fix: revert TS bump
myandrienko Aug 29, 2024
ba25d98
fix: revert some unrelated changes
myandrienko Aug 29, 2024
656860a
fix: optimize subscribeManageThreadSubscriptions
myandrienko Aug 29, 2024
a6d69ef
refactor: lazy lookup table
arnautov-anton Aug 29, 2024
082769f
Remove threadData from the Channel instantiation step
arnautov-anton Aug 30, 2024
ea02306
Fix missing initial unreadThreadCount
arnautov-anton Aug 30, 2024
23b70aa
fix: latest replies are always the last page
myandrienko Sep 2, 2024
5e0f99a
Fix tests, add missing TM test
arnautov-anton Sep 2, 2024
226c03d
fix: pagination tests
myandrienko Sep 2, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,15 @@
"eslint-plugin-typescript-sort-keys": "1.5.0",
"husky": "^4.3.8",
"lint-staged": "^15.2.2",
"mocha": "^9.1.3",
"mocha": "^10.7.0",
"nyc": "^15.1.0",
"prettier": "^2.2.1",
"rollup": "^2.41.0",
"rollup-plugin-peer-deps-external": "^2.2.4",
"rollup-plugin-terser": "^7.0.2",
"sinon": "^12.0.1",
"standard-version": "^9.3.2",
"typescript": "^4.2.3",
"typescript": "^5.5.4",
"uuid": "^8.3.2"
},
"scripts": {
Expand All @@ -118,7 +118,7 @@
"test-types": "node test/typescript/index.js && tsc --esModuleInterop true --noEmit true --strictNullChecks true --noImplicitAny true --strict true test/typescript/*.ts",
"eslint": "eslint '**/*.{js,md,ts}' --max-warnings 0 --ignore-path ./.eslintignore",
"eslint-fix": "npx eslint --fix '**/*.{js,md,ts}' --max-warnings 0 --ignore-path ./.eslintignore",
"test-unit": "NODE_ENV=test mocha --exit --bail --timeout 20000 --require ./babel-register test/unit/*.js",
"test-unit": "NODE_ENV=test mocha --exit --bail --timeout 20000 --require ./babel-register test/unit/*.{js,test.ts}",
"test-coverage": "nyc yarn test-unit",
"test": "yarn test-unit",
"testwatch": "NODE_ENV=test nodemon ./node_modules/.bin/mocha --timeout 20000 --require test-entry.js test/test.js",
Expand Down
1 change: 1 addition & 0 deletions src/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,7 @@ export class Channel<StreamChatGenerics extends ExtendableGenerics = DefaultGene
return response;
}

// TODO: move to thread
/**
* getReplies - List the message replies for a parent message
*
Expand Down
33 changes: 11 additions & 22 deletions src/channel_state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {
ReactionResponse,
UserResponse,
} from './types';
import { addToMessageList } from './utils';
import { addToMessageList, formatMessage } from './utils';

type ChannelReadStatus<StreamChatGenerics extends ExtendableGenerics = DefaultGenerics> = Record<
string,
Expand Down Expand Up @@ -61,6 +61,7 @@ export class ChannelState<StreamChatGenerics extends ExtendableGenerics = Defaul
isLatest: boolean;
messages: Array<ReturnType<ChannelState<StreamChatGenerics>['formatMessage']>>;
}[] = [];

constructor(channel: Channel<StreamChatGenerics>) {
this._channel = channel;
this.watcher_count = 0;
Expand Down Expand Up @@ -132,26 +133,13 @@ export class ChannelState<StreamChatGenerics extends ExtendableGenerics = Defaul
}

/**
* formatMessage - Takes the message object. Parses the dates, sets __html
* and sets the status to received if missing. Returns a message object
*
* @param {MessageResponse<StreamChatGenerics>} message a message object
* Takes the message object, parses the dates, sets `__html`
* and sets the status to `received` if missing; returns a new message object.
*
* @param {MessageResponse<StreamChatGenerics>} message `MessageResponse` object
*/
formatMessage(message: MessageResponse<StreamChatGenerics>): FormatMessageResponse<StreamChatGenerics> {
return {
...message,
/**
* @deprecated please use `html`
*/
__html: message.html,
// parse the date..
pinned_at: message.pinned_at ? new Date(message.pinned_at) : null,
created_at: message.created_at ? new Date(message.created_at) : new Date(),
updated_at: message.updated_at ? new Date(message.updated_at) : new Date(),
status: message.status || 'received',
};
}
formatMessage = (message: MessageResponse<StreamChatGenerics>): FormatMessageResponse<StreamChatGenerics> =>
formatMessage<StreamChatGenerics>(message);

/**
* addMessagesSorted - Add the list of messages to state and resorts the messages
Expand Down Expand Up @@ -295,15 +283,16 @@ export class ChannelState<StreamChatGenerics extends ExtendableGenerics = Defaul
this.pinnedMessages = result;
}

// TODO: clean this up
addReaction(
reaction: ReactionResponse<StreamChatGenerics>,
message?: MessageResponse<StreamChatGenerics>,
enforce_unique?: boolean,
enforceUnique?: boolean,
) {
if (!message) return;
const messageWithReaction = message;
this._updateMessage(message, (msg) => {
messageWithReaction.own_reactions = this._addOwnReactionToMessage(msg.own_reactions, reaction, enforce_unique);
messageWithReaction.own_reactions = this._addOwnReactionToMessage(msg.own_reactions, reaction, enforceUnique);
return this.formatMessage(messageWithReaction);
});
return messageWithReaction;
Expand Down Expand Up @@ -664,7 +653,7 @@ export class ChannelState<StreamChatGenerics extends ExtendableGenerics = Defaul
messages[i] = {
...m,
type: 'deleted',
deleted_at: user.deleted_at,
deleted_at: user.deleted_at ? new Date(user.deleted_at) : null,
};
}
}
Expand Down
18 changes: 12 additions & 6 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ import {
import { InsightMetrics, postInsights } from './insights';
import { Thread } from './thread';
import { Moderation } from './moderation';
import { ThreadManager } from './thread_manager';

function isString(x: unknown): x is string {
return typeof x === 'string' || x instanceof String;
Expand All @@ -219,6 +220,7 @@ export class StreamChat<StreamChatGenerics extends ExtendableGenerics = DefaultG
activeChannels: {
[key: string]: Channel<StreamChatGenerics>;
};
threads: ThreadManager<StreamChatGenerics>;
anonymous: boolean;
persistUserOnConnectionFailure?: boolean;
axiosInstance: AxiosInstance;
Expand Down Expand Up @@ -338,6 +340,7 @@ export class StreamChat<StreamChatGenerics extends ExtendableGenerics = DefaultG
this.setUserPromise = null;
// keeps a reference to all the channels that are in use
this.activeChannels = {};

// mapping between channel groups and configs
this.configs = {};
this.anonymous = false;
Expand All @@ -349,8 +352,8 @@ export class StreamChat<StreamChatGenerics extends ExtendableGenerics = DefaultG
this.consecutiveFailures = 0;
this.insightMetrics = new InsightMetrics();

this.defaultWSTimeoutWithFallback = 6000;
this.defaultWSTimeout = 15000;
this.defaultWSTimeoutWithFallback = 6 * 1000;
this.defaultWSTimeout = 15 * 1000;

this.axiosInstance.defaults.paramsSerializer = axiosParamsSerializer;

Expand Down Expand Up @@ -404,6 +407,9 @@ export class StreamChat<StreamChatGenerics extends ExtendableGenerics = DefaultG
*/
this.logger = isFunction(inputOptions.logger) ? inputOptions.logger : () => null;
this.recoverStateOnReconnect = this.options.recoverStateOnReconnect;

// reusing the same name the channel has (Channel.threads)
this.threads = new ThreadManager({ client: this });
}

/**
Expand Down Expand Up @@ -604,7 +610,7 @@ export class StreamChat<StreamChatGenerics extends ExtendableGenerics = DefaultG
tags: ['connection', 'client'],
});

return Promise.resolve();
return;
}

this.clientID = `${this.userID}--${randomId()}`;
Expand Down Expand Up @@ -1432,7 +1438,7 @@ export class StreamChat<StreamChatGenerics extends ExtendableGenerics = DefaultG
return await this.wsConnection.connect(
this.options.enableWSFallback ? this.defaultWSTimeoutWithFallback : this.defaultWSTimeout,
);
} catch (err) {
} catch (err: any) {
// run fallback only if it's WS/Network error and not a normal API error
// make sure browser is online before even trying the longpoll
if (this.options.enableWSFallback && isWSFailure(err) && isOnline()) {
Expand Down Expand Up @@ -2721,7 +2727,7 @@ export class StreamChat<StreamChatGenerics extends ExtendableGenerics = DefaultG
const res = await this.post<QueryThreadsAPIResponse<StreamChatGenerics>>(this.baseURL + `/threads`, opts);

return {
threads: res.threads.map((thread) => new Thread(this, thread)),
threads: res.threads.map((thread) => new Thread({ client: this, threadData: thread })),
next: res.next,
};
}
Expand Down Expand Up @@ -2751,7 +2757,7 @@ export class StreamChat<StreamChatGenerics extends ExtendableGenerics = DefaultG

const res = await this.get<GetThreadAPIResponse<StreamChatGenerics>>(this.baseURL + `/threads/${messageId}`, opts);

return new Thread<StreamChatGenerics>(this, res.thread);
return new Thread<StreamChatGenerics>({ client: this, threadData: res.thread });
}

/**
Expand Down
8 changes: 4 additions & 4 deletions src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ export class StableWSConnection<StreamChatGenerics extends ExtendableGenerics =
this.consecutiveFailures = 0;

this._log(`connect() - Established ws connection with healthcheck: ${healthCheck}`);
} catch (error) {
} catch (error: any) {
this.isHealthy = false;
this.consecutiveFailures += 1;

Expand Down Expand Up @@ -148,7 +148,7 @@ export class StableWSConnection<StreamChatGenerics extends ExtendableGenerics =
for (let i = 0; i <= timeout; i += interval) {
try {
return await this.connectionOpen;
} catch (error) {
} catch (error: any) {
if (i === timeout) {
throw new Error(
JSON.stringify({
Expand Down Expand Up @@ -298,7 +298,7 @@ export class StableWSConnection<StreamChatGenerics extends ExtendableGenerics =
}
return response;
}
} catch (err) {
} catch (err: any) {
this.isConnecting = false;
this._log(`_connect() - Error - `, err);
if (this.client.options.enableInsights) {
Expand Down Expand Up @@ -366,7 +366,7 @@ export class StableWSConnection<StreamChatGenerics extends ExtendableGenerics =
this._log('_reconnect() - Finished recoverCallBack');

this.consecutiveFailures = 0;
} catch (error) {
} catch (error: any) {
this.isHealthy = false;
this.consecutiveFailures += 1;
if (error.code === chatCodes.TOKEN_EXPIRED && !this.client.tokenManager.isStatic()) {
Expand Down
4 changes: 2 additions & 2 deletions src/connection_fallback.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ export class WSConnectionFallback<StreamChatGenerics extends ExtendableGenerics

this.consecutiveFailures = 0; // always reset in case of no error
return res;
} catch (err) {
} catch (err: any) {
this.consecutiveFailures += 1;

if (retry && isErrorRetryable(err)) {
Expand All @@ -107,7 +107,7 @@ export class WSConnectionFallback<StreamChatGenerics extends ExtendableGenerics
this.client.dispatchEvent(data.events[i]);
}
}
} catch (err) {
} catch (err: any) {
if (axios.isCancel(err)) {
this._log(`_poll() - axios canceled request`);
return;
Expand Down
2 changes: 2 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ export * from './client_state';
export * from './channel';
export * from './channel_state';
export * from './thread';
export * from './thread_manager';
export * from './connection';
export * from './events';
export * from './moderation';
Expand All @@ -15,3 +16,4 @@ export * from './types';
export * from './segment';
export * from './campaign';
export { isOwnUser, chatCodes, logChatPromiseExecution, formatMessage } from './utils';
export * from './store';
57 changes: 57 additions & 0 deletions src/store.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
export type Patch<T> = (value: T) => T;
export type Handler<T> = (nextValue: T, previousValue: T | undefined) => void;
export type Unsubscibe = () => void;

function isPatch<T>(value: T | Patch<T>): value is Patch<T> {
return typeof value === 'function';
}

export class StateStore<T extends Record<string, unknown>> {
private handlerSet = new Set<Handler<T>>();

constructor(private value: T) {}

public next = (newValueOrPatch: T | Patch<T>): void => {
// newValue (or patch output) should never be mutated previous value
const newValue = isPatch(newValueOrPatch) ? newValueOrPatch(this.value) : newValueOrPatch;

// do not notify subscribers if the value hasn't changed
if (newValue === this.value) return;

const oldValue = this.value;
this.value = newValue;

this.handlerSet.forEach((handler) => handler(this.value, oldValue));
};

public partialNext = (partial: Partial<T>): void => this.next((current) => ({ ...current, ...partial }));

public getLatestValue = (): T => this.value;

public subscribe = (handler: Handler<T>): Unsubscibe => {
arnautov-anton marked this conversation as resolved.
Show resolved Hide resolved
handler(this.value, undefined);
this.handlerSet.add(handler);
return () => {
this.handlerSet.delete(handler);
};
};

public subscribeWithSelector = <O extends readonly unknown[]>(selector: (nextValue: T) => O, handler: Handler<O>) => {
// begin with undefined to reduce amount of selector calls
let selectedValues: O | undefined;

const wrappedHandler: Handler<T> = (nextValue) => {
const newlySelectedValues = selector(nextValue);
const hasUpdatedValues = selectedValues?.some((value, index) => value !== newlySelectedValues[index]) ?? true;

if (!hasUpdatedValues) return;

const oldSelectedValues = selectedValues;
selectedValues = newlySelectedValues;

handler(newlySelectedValues, oldSelectedValues);
};

return this.subscribe(wrappedHandler);
};
}
Loading
Loading