Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Automatic thread deletion using node-cron #9256

3 changes: 2 additions & 1 deletion apps/app/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@
"@testing-library/user-event": "^14.5.2",
"@types/express": "^4.17.21",
"@types/jest": "^29.5.2",
"@types/node-cron": "^3.0.11",
yuki-takei marked this conversation as resolved.
Show resolved Hide resolved
"@types/react-input-autosize": "^2.2.4",
"@types/react-scroll": "^1.8.4",
"@types/react-stickynode": "^4.0.3",
Expand Down Expand Up @@ -275,8 +276,8 @@
"react-hotkeys": "^2.0.0",
"react-input-autosize": "^3.0.0",
"react-toastify": "^9.1.3",
"remark-github-admonitions-to-directives": "^2.0.0",
"rehype-rewrite": "^4.0.2",
"remark-github-admonitions-to-directives": "^2.0.0",
"replacestream": "^4.0.3",
"sass": "^1.53.0",
"simple-load-script": "^1.0.2",
Expand Down
10 changes: 9 additions & 1 deletion apps/app/src/features/openai/server/models/thread-relation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ interface ThreadRelationDocument extends ThreadRelation, Document {
updateThreadExpiration(): Promise<void>;
}

type ThreadRelationModel = Model<ThreadRelationDocument>
interface ThreadRelationModel extends Model<ThreadRelationDocument> {
getExpiredThreadRelations(limit?: number): Promise<ThreadRelationDocument[] | undefined>;
}

const schema = new Schema<ThreadRelationDocument, ThreadRelationModel>({
userId: {
Expand All @@ -41,6 +43,12 @@ const schema = new Schema<ThreadRelationDocument, ThreadRelationModel>({
},
});

schema.statics.getExpiredThreadRelations = async function(limit?: number): Promise<ThreadRelationDocument[] | undefined> {
const currentDate = new Date();
const expiredThreadRelations = await this.find({ expiredAt: { $lte: currentDate } }).limit(limit ?? 100).exec();
return expiredThreadRelations;
};

schema.methods.updateThreadExpiration = async function(): Promise<void> {
this.expiredAt = generateExpirationDate();
await this.save();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ export class AzureOpenaiClientDelegator implements IOpenaiClientDelegator {
return this.client.beta.threads.retrieve(threadId);
}

async deleteThread(threadId: string): Promise<OpenAI.Beta.Threads.ThreadDeleted> {
return this.client.beta.threads.del(threadId);
}

async createVectorStore(scopeType:VectorStoreScopeType): Promise<OpenAI.Beta.VectorStores.VectorStore> {
return this.client.beta.vectorStores.create({ name: `growi-vector-store-{${scopeType}` });
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import type { VectorStoreScopeType } from '~/features/openai/server/models/vecto
export interface IOpenaiClientDelegator {
createThread(vectorStoreId: string): Promise<OpenAI.Beta.Threads.Thread>
retrieveThread(threadId: string): Promise<OpenAI.Beta.Threads.Thread>
deleteThread(threadId: string): Promise<OpenAI.Beta.Threads.ThreadDeleted>
retrieveVectorStore(vectorStoreId: string): Promise<OpenAI.Beta.VectorStores.VectorStore>
createVectorStore(scopeType:VectorStoreScopeType): Promise<OpenAI.Beta.VectorStores.VectorStore>
uploadFile(file: Uploadable): Promise<OpenAI.Files.FileObject>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ export class OpenaiClientDelegator implements IOpenaiClientDelegator {
return this.client.beta.threads.retrieve(threadId);
}

async deleteThread(threadId: string): Promise<OpenAI.Beta.Threads.ThreadDeleted> {
return this.client.beta.threads.del(threadId);
}

async createVectorStore(scopeType:VectorStoreScopeType): Promise<OpenAI.Beta.VectorStores.VectorStore> {
return this.client.beta.vectorStores.create({ name: `growi-vector-store-${scopeType}` });
}
Expand Down
23 changes: 23 additions & 0 deletions apps/app/src/features/openai/server/services/openai.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ let isVectorStoreForPublicScopeExist = false;
export interface IOpenaiService {
getOrCreateThread(userId: string, vectorStoreId?: string, threadId?: string): Promise<OpenAI.Beta.Threads.Thread | undefined>;
getOrCreateVectorStoreForPublicScope(): Promise<VectorStoreDocument>;
deleteExpiredThreads(limit: number): Promise<void>;
createVectorStoreFile(pages: PageDocument[]): Promise<void>;
deleteVectorStoreFile(pageId: Types.ObjectId): Promise<void>;
rebuildVectorStoreAll(): Promise<void>;
Expand Down Expand Up @@ -70,6 +71,28 @@ class OpenaiService implements IOpenaiService {
return thread;
}

public async deleteExpiredThreads(limit: number): Promise<void> {
const threadRelations = await ThreadRelationModel.getExpiredThreadRelations(limit);
if (threadRelations == null) {
return;
}

const deletedThreadIds: string[] = [];
for (const threadRelation of threadRelations) {
try {
// eslint-disable-next-line no-await-in-loop
const deleteThreadResponse = await this.client.deleteThread(threadRelation.threadId);
logger.debug('Delete thread', deleteThreadResponse);
deletedThreadIds.push(threadRelation.threadId);
}
catch (err) {
logger.error(err);
}
}

await ThreadRelationModel.deleteMany({ threadId: { $in: deletedThreadIds } });
}

public async getOrCreateVectorStoreForPublicScope(): Promise<VectorStoreDocument> {
const vectorStoreDocument = await VectorStoreModel.findOne({ scorpeType: VectorStoreScopeType.PUBLIC });

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import nodeCron from 'node-cron';

import { configManager } from '~/server/service/config-manager';
import loggerFactory from '~/utils/logger';

import { getOpenaiService, type IOpenaiService } from './openai';


const logger = loggerFactory('growi:service:thread-deletion-cron');

const DELETE_LIMIT = 100;

class ThreadDeletionCronService {

cronJob: nodeCron.ScheduledTask;

openaiService: IOpenaiService;

startCron(): void {
const isAiEnabled = configManager.getConfig('crowi', 'app:aiEnabled');
if (!isAiEnabled) {
return;
}

const openaiService = getOpenaiService();
if (openaiService == null) {
throw new Error('OpenAI service is not initialized');
}

this.openaiService = openaiService;

// Executed at 0 minutes of every hour
const cronSchedule = '0 * * * *';
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

一時間に一回実行

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

環境変数でカスタムできても良いかも


this.cronJob?.stop();
this.cronJob = this.generateCronJob(cronSchedule);
this.cronJob.start();
}

private async executeJob(): Promise<void> {
// Must be careful of OpenAI's rate limit
// Delete up to 100 threads per hour
await this.openaiService.deleteExpiredThreads(DELETE_LIMIT);
Copy link
Member Author

@miya miya Oct 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • OpenAI の rate limit に考慮して最大100件まで thread を削除する
  • いっきに何千件も削除してしまった場合、メインのナレッジアシスタントに影響が出てしまう可能性があるため

}

private generateCronJob(cronSchedule: string) {
return nodeCron.schedule(cronSchedule, async() => {
try {
await this.executeJob();
}
catch (e) {
logger.error(e);
}
});
}

}

export default ThreadDeletionCronService;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5 changes: 5 additions & 0 deletions apps/app/src/server/crowi/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import pkg from '^/package.json';

import { KeycloakUserGroupSyncService } from '~/features/external-user-group/server/service/keycloak-user-group-sync';
import { LdapUserGroupSyncService } from '~/features/external-user-group/server/service/ldap-user-group-sync';
import OpenaiThreadDeletionCronService from '~/features/openai/server/services/thread-deletion-cron';
import QuestionnaireService from '~/features/questionnaire/server/service/questionnaire';
import QuestionnaireCronService from '~/features/questionnaire/server/service/questionnaire-cron';
import loggerFactory from '~/utils/logger';
Expand Down Expand Up @@ -102,6 +103,7 @@ class Crowi {
this.commentService = null;
this.questionnaireService = null;
this.questionnaireCronService = null;
this.openaiThreadDeletionCronService = null;

this.tokens = null;

Expand Down Expand Up @@ -312,6 +314,9 @@ Crowi.prototype.setupSocketIoService = async function() {
Crowi.prototype.setupCron = function() {
this.questionnaireCronService = new QuestionnaireCronService(this);
this.questionnaireCronService.startCron();

this.openaiThreadDeletionCronService = new OpenaiThreadDeletionCronService();
this.openaiThreadDeletionCronService.startCron();
};

Crowi.prototype.setupQuestionnaireService = function() {
Expand Down
5 changes: 5 additions & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4687,6 +4687,11 @@
resolved "https://registry.yarnpkg.com/@types/ms/-/ms-0.7.31.tgz#31b7ca6407128a3d2bbc27fe2d21b345397f6197"
integrity sha512-iiUgKzV9AuaEkZqkOLDIvlQiL6ltuZd9tGcW3gwpnX8JbuiuhFlEGmmFXEXkN50Cvq7Os88IY2v0dkDqXYWVgA==

"@types/node-cron@^3.0.11":
version "3.0.11"
resolved "https://registry.yarnpkg.com/@types/node-cron/-/node-cron-3.0.11.tgz#70b7131f65038ae63cfe841354c8aba363632344"
integrity sha512-0ikrnug3/IyneSHqCBeslAhlK2aBfYek1fGo4bP4QnZPmiqSGRK+Oy7ZMisLWkesffJvQ1cqAcBnJC+8+nxIAg==

"@types/node-fetch@^2.5.0", "@types/node-fetch@^2.6.4":
version "2.6.11"
resolved "https://registry.yarnpkg.com/@types/node-fetch/-/node-fetch-2.6.11.tgz#9b39b78665dae0e82a08f02f4967d62c66f95d24"
Expand Down
Loading