Skip to content

Commit

Permalink
feat: threads 2.0 (#1330)
Browse files Browse the repository at this point in the history
Co-authored-by: Matvei Andrienko <m.y.andrienko@outlook.com>
  • Loading branch information
arnautov-anton and myandrienko committed Sep 2, 2024
1 parent 8e9bc86 commit 4b1ffe8
Show file tree
Hide file tree
Showing 14 changed files with 2,439 additions and 419 deletions.
9 changes: 5 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,15 @@
"eslint-plugin-typescript-sort-keys": "1.5.0",
"husky": "^4.3.8",
"lint-staged": "^15.2.2",
"mocha": "^9.1.3",
"mocha": "^10.7.0",
"nyc": "^15.1.0",
"prettier": "^2.2.1",
"rollup": "^2.41.0",
"rollup-plugin-peer-deps-external": "^2.2.4",
"rollup-plugin-terser": "^7.0.2",
"sinon": "^12.0.1",
"standard-version": "^9.3.2",
"typescript": "^4.2.3",
"typescript": "4.2.3",
"uuid": "^8.3.2"
},
"scripts": {
Expand All @@ -118,7 +118,7 @@
"test-types": "node test/typescript/index.js && tsc --esModuleInterop true --noEmit true --strictNullChecks true --noImplicitAny true --strict true test/typescript/*.ts",
"eslint": "eslint '**/*.{js,md,ts}' --max-warnings 0 --ignore-path ./.eslintignore",
"eslint-fix": "npx eslint --fix '**/*.{js,md,ts}' --max-warnings 0 --ignore-path ./.eslintignore",
"test-unit": "NODE_ENV=test mocha --exit --bail --timeout 20000 --require ./babel-register test/unit/*.js",
"test-unit": "NODE_ENV=test mocha --exit --bail --timeout 20000 --require ./babel-register test/unit/*.{js,test.ts}",
"test-coverage": "nyc yarn test-unit",
"test": "yarn test-unit",
"testwatch": "NODE_ENV=test nodemon ./node_modules/.bin/mocha --timeout 20000 --require test-entry.js test/test.js",
Expand All @@ -132,5 +132,6 @@
},
"engines": {
"node": ">=16"
}
},
"packageManager": "yarn@1.22.21+sha1.1959a18351b811cdeedbd484a8f86c3cc3bbaf72"
}
4 changes: 3 additions & 1 deletion src/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -824,7 +824,9 @@ export class Channel<StreamChatGenerics extends ExtendableGenerics = DefaultGene
}

/**
* getReplies - List the message replies for a parent message
* getReplies - List the message replies for a parent message.
*
* The recommended way of working with threads is to use the Thread class.
*
* @param {string} parent_id The message parent id, ie the top of the thread
* @param {MessagePaginationOptions & { user?: UserResponse<StreamChatGenerics>; user_id?: string }} options Pagination params, ie {limit:10, id_lte: 10}
Expand Down
27 changes: 7 additions & 20 deletions src/channel_state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import {
ReactionResponse,
UserResponse,
} from './types';
import { addToMessageList } from './utils';
import { addToMessageList, formatMessage } from './utils';
import { DEFAULT_MESSAGE_SET_PAGINATION } from './constants';

type ChannelReadStatus<StreamChatGenerics extends ExtendableGenerics = DefaultGenerics> = Record<
Expand Down Expand Up @@ -59,6 +59,7 @@ export class ChannelState<StreamChatGenerics extends ExtendableGenerics = Defaul
* The messages array contains the currently active set
*/
messageSets: MessageSet[] = [];

constructor(channel: Channel<StreamChatGenerics>) {
this._channel = channel;
this.watcher_count = 0;
Expand Down Expand Up @@ -134,26 +135,12 @@ export class ChannelState<StreamChatGenerics extends ExtendableGenerics = Defaul
}

/**
* formatMessage - Takes the message object. Parses the dates, sets __html
* and sets the status to received if missing. Returns a message object
*
* @param {MessageResponse<StreamChatGenerics>} message a message object
* Takes the message object, parses the dates, sets `__html`
* and sets the status to `received` if missing; returns a new message object.
*
* @param {MessageResponse<StreamChatGenerics>} message `MessageResponse` object
*/
formatMessage(message: MessageResponse<StreamChatGenerics>): FormatMessageResponse<StreamChatGenerics> {
return {
...message,
/**
* @deprecated please use `html`
*/
__html: message.html,
// parse the date..
pinned_at: message.pinned_at ? new Date(message.pinned_at) : null,
created_at: message.created_at ? new Date(message.created_at) : new Date(),
updated_at: message.updated_at ? new Date(message.updated_at) : new Date(),
status: message.status || 'received',
};
}
formatMessage = (message: MessageResponse<StreamChatGenerics>) => formatMessage<StreamChatGenerics>(message);

/**
* addMessagesSorted - Add the list of messages to state and resorts the messages
Expand Down Expand Up @@ -666,7 +653,7 @@ export class ChannelState<StreamChatGenerics extends ExtendableGenerics = Defaul
messages[i] = {
...m,
type: 'deleted',
deleted_at: user.deleted_at,
deleted_at: user.deleted_at ? new Date(user.deleted_at) : null,
};
}
}
Expand Down
14 changes: 9 additions & 5 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ import {
import { InsightMetrics, postInsights } from './insights';
import { Thread } from './thread';
import { Moderation } from './moderation';
import { ThreadManager } from './thread_manager';
import { DEFAULT_QUERY_CHANNELS_MESSAGE_LIST_PAGE_SIZE } from './constants';

function isString(x: unknown): x is string {
Expand All @@ -221,6 +222,7 @@ export class StreamChat<StreamChatGenerics extends ExtendableGenerics = DefaultG
activeChannels: {
[key: string]: Channel<StreamChatGenerics>;
};
threads: ThreadManager<StreamChatGenerics>;
anonymous: boolean;
persistUserOnConnectionFailure?: boolean;
axiosInstance: AxiosInstance;
Expand Down Expand Up @@ -340,6 +342,7 @@ export class StreamChat<StreamChatGenerics extends ExtendableGenerics = DefaultG
this.setUserPromise = null;
// keeps a reference to all the channels that are in use
this.activeChannels = {};

// mapping between channel groups and configs
this.configs = {};
this.anonymous = false;
Expand All @@ -351,8 +354,8 @@ export class StreamChat<StreamChatGenerics extends ExtendableGenerics = DefaultG
this.consecutiveFailures = 0;
this.insightMetrics = new InsightMetrics();

this.defaultWSTimeoutWithFallback = 6000;
this.defaultWSTimeout = 15000;
this.defaultWSTimeoutWithFallback = 6 * 1000;
this.defaultWSTimeout = 15 * 1000;

this.axiosInstance.defaults.paramsSerializer = axiosParamsSerializer;

Expand Down Expand Up @@ -406,6 +409,7 @@ export class StreamChat<StreamChatGenerics extends ExtendableGenerics = DefaultG
*/
this.logger = isFunction(inputOptions.logger) ? inputOptions.logger : () => null;
this.recoverStateOnReconnect = this.options.recoverStateOnReconnect;
this.threads = new ThreadManager({ client: this });
}

/**
Expand Down Expand Up @@ -606,7 +610,7 @@ export class StreamChat<StreamChatGenerics extends ExtendableGenerics = DefaultG
tags: ['connection', 'client'],
});

return Promise.resolve();
return;
}

this.clientID = `${this.userID}--${randomId()}`;
Expand Down Expand Up @@ -2754,7 +2758,7 @@ export class StreamChat<StreamChatGenerics extends ExtendableGenerics = DefaultG
const res = await this.post<QueryThreadsAPIResponse<StreamChatGenerics>>(this.baseURL + `/threads`, opts);

return {
threads: res.threads.map((thread) => new Thread(this, thread)),
threads: res.threads.map((thread) => new Thread({ client: this, threadData: thread })),
next: res.next,
};
}
Expand Down Expand Up @@ -2787,7 +2791,7 @@ export class StreamChat<StreamChatGenerics extends ExtendableGenerics = DefaultG
opts,
);

return new Thread<StreamChatGenerics>(this, res.thread);
return new Thread<StreamChatGenerics>({ client: this, threadData: res.thread });
}

/**
Expand Down
2 changes: 2 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ export * from './client_state';
export * from './channel';
export * from './channel_state';
export * from './thread';
export * from './thread_manager';
export * from './connection';
export * from './events';
export * from './moderation';
Expand All @@ -15,3 +16,4 @@ export * from './types';
export * from './segment';
export * from './campaign';
export { isOwnUser, chatCodes, logChatPromiseExecution, formatMessage } from './utils';
export * from './store';
57 changes: 57 additions & 0 deletions src/store.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
export type Patch<T> = (value: T) => T;
export type Handler<T> = (nextValue: T, previousValue: T | undefined) => void;
export type Unsubscribe = () => void;

function isPatch<T>(value: T | Patch<T>): value is Patch<T> {
return typeof value === 'function';
}

export class StateStore<T extends Record<string, unknown>> {
private handlerSet = new Set<Handler<T>>();

constructor(private value: T) {}

public next = (newValueOrPatch: T | Patch<T>): void => {
// newValue (or patch output) should never be mutated previous value
const newValue = isPatch(newValueOrPatch) ? newValueOrPatch(this.value) : newValueOrPatch;

// do not notify subscribers if the value hasn't changed
if (newValue === this.value) return;

const oldValue = this.value;
this.value = newValue;

this.handlerSet.forEach((handler) => handler(this.value, oldValue));
};

public partialNext = (partial: Partial<T>): void => this.next((current) => ({ ...current, ...partial }));

public getLatestValue = (): T => this.value;

public subscribe = (handler: Handler<T>): Unsubscribe => {
handler(this.value, undefined);
this.handlerSet.add(handler);
return () => {
this.handlerSet.delete(handler);
};
};

public subscribeWithSelector = <O extends readonly unknown[]>(selector: (nextValue: T) => O, handler: Handler<O>) => {
// begin with undefined to reduce amount of selector calls
let selectedValues: O | undefined;

const wrappedHandler: Handler<T> = (nextValue) => {
const newlySelectedValues = selector(nextValue);
const hasUpdatedValues = selectedValues?.some((value, index) => value !== newlySelectedValues[index]) ?? true;

if (!hasUpdatedValues) return;

const oldSelectedValues = selectedValues;
selectedValues = newlySelectedValues;

handler(newlySelectedValues, oldSelectedValues);
};

return this.subscribe(wrappedHandler);
};
}
Loading

0 comments on commit 4b1ffe8

Please sign in to comment.