Skip to content

Commit

Permalink
Merge pull request #9256 from weseek/feat/155763-automatic-thread-del…
Browse files Browse the repository at this point in the history
…etion

feat: Automatic thread deletion using node-cron
  • Loading branch information
mergify[bot] authored Oct 18, 2024
2 parents 6d3919d + fb767d3 commit cc279f4
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 2 deletions.
3 changes: 2 additions & 1 deletion apps/app/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@
"@testing-library/user-event": "^14.5.2",
"@types/express": "^4.17.21",
"@types/jest": "^29.5.2",
"@types/node-cron": "^3.0.11",
"@types/react-input-autosize": "^2.2.4",
"@types/react-scroll": "^1.8.4",
"@types/react-stickynode": "^4.0.3",
Expand Down Expand Up @@ -275,8 +276,8 @@
"react-hotkeys": "^2.0.0",
"react-input-autosize": "^3.0.0",
"react-toastify": "^9.1.3",
"remark-github-admonitions-to-directives": "^2.0.0",
"rehype-rewrite": "^4.0.2",
"remark-github-admonitions-to-directives": "^2.0.0",
"replacestream": "^4.0.3",
"sass": "^1.53.0",
"simple-load-script": "^1.0.2",
Expand Down
10 changes: 9 additions & 1 deletion apps/app/src/features/openai/server/models/thread-relation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ interface ThreadRelationDocument extends ThreadRelation, Document {
updateThreadExpiration(): Promise<void>;
}

type ThreadRelationModel = Model<ThreadRelationDocument>
interface ThreadRelationModel extends Model<ThreadRelationDocument> {
getExpiredThreadRelations(limit?: number): Promise<ThreadRelationDocument[] | undefined>;
}

const schema = new Schema<ThreadRelationDocument, ThreadRelationModel>({
userId: {
Expand All @@ -41,6 +43,12 @@ const schema = new Schema<ThreadRelationDocument, ThreadRelationModel>({
},
});

schema.statics.getExpiredThreadRelations = async function(limit?: number): Promise<ThreadRelationDocument[] | undefined> {
const currentDate = new Date();
const expiredThreadRelations = await this.find({ expiredAt: { $lte: currentDate } }).limit(limit ?? 100).exec();
return expiredThreadRelations;
};

schema.methods.updateThreadExpiration = async function(): Promise<void> {
this.expiredAt = generateExpirationDate();
await this.save();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ export class AzureOpenaiClientDelegator implements IOpenaiClientDelegator {
return this.client.beta.threads.retrieve(threadId);
}

async deleteThread(threadId: string): Promise<OpenAI.Beta.Threads.ThreadDeleted> {
return this.client.beta.threads.del(threadId);
}

async createVectorStore(scopeType:VectorStoreScopeType): Promise<OpenAI.Beta.VectorStores.VectorStore> {
return this.client.beta.vectorStores.create({ name: `growi-vector-store-{${scopeType}` });
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import type { VectorStoreScopeType } from '~/features/openai/server/models/vecto
export interface IOpenaiClientDelegator {
createThread(vectorStoreId: string): Promise<OpenAI.Beta.Threads.Thread>
retrieveThread(threadId: string): Promise<OpenAI.Beta.Threads.Thread>
deleteThread(threadId: string): Promise<OpenAI.Beta.Threads.ThreadDeleted>
retrieveVectorStore(vectorStoreId: string): Promise<OpenAI.Beta.VectorStores.VectorStore>
createVectorStore(scopeType:VectorStoreScopeType): Promise<OpenAI.Beta.VectorStores.VectorStore>
uploadFile(file: Uploadable): Promise<OpenAI.Files.FileObject>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ export class OpenaiClientDelegator implements IOpenaiClientDelegator {
return this.client.beta.threads.retrieve(threadId);
}

async deleteThread(threadId: string): Promise<OpenAI.Beta.Threads.ThreadDeleted> {
return this.client.beta.threads.del(threadId);
}

async createVectorStore(scopeType:VectorStoreScopeType): Promise<OpenAI.Beta.VectorStores.VectorStore> {
return this.client.beta.vectorStores.create({ name: `growi-vector-store-${scopeType}` });
}
Expand Down
23 changes: 23 additions & 0 deletions apps/app/src/features/openai/server/services/openai.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ let isVectorStoreForPublicScopeExist = false;
export interface IOpenaiService {
getOrCreateThread(userId: string, vectorStoreId?: string, threadId?: string): Promise<OpenAI.Beta.Threads.Thread | undefined>;
getOrCreateVectorStoreForPublicScope(): Promise<VectorStoreDocument>;
deleteExpiredThreads(limit: number): Promise<void>;
createVectorStoreFile(pages: PageDocument[]): Promise<void>;
deleteVectorStoreFile(pageId: Types.ObjectId): Promise<void>;
rebuildVectorStoreAll(): Promise<void>;
Expand Down Expand Up @@ -70,6 +71,28 @@ class OpenaiService implements IOpenaiService {
return thread;
}

public async deleteExpiredThreads(limit: number): Promise<void> {
const expiredThreadRelations = await ThreadRelationModel.getExpiredThreadRelations(limit);
if (expiredThreadRelations == null) {
return;
}

const deletedThreadIds: string[] = [];
for (const expiredThreadRelation of expiredThreadRelations) {
try {
// eslint-disable-next-line no-await-in-loop
const deleteThreadResponse = await this.client.deleteThread(expiredThreadRelation.threadId);
logger.debug('Delete thread', deleteThreadResponse);
deletedThreadIds.push(expiredThreadRelation.threadId);
}
catch (err) {
logger.error(err);
}
}

await ThreadRelationModel.deleteMany({ threadId: { $in: deletedThreadIds } });
}

public async getOrCreateVectorStoreForPublicScope(): Promise<VectorStoreDocument> {
const vectorStoreDocument = await VectorStoreModel.findOne({ scorpeType: VectorStoreScopeType.PUBLIC });

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import nodeCron from 'node-cron';

import { configManager } from '~/server/service/config-manager';
import loggerFactory from '~/utils/logger';

import { getOpenaiService, type IOpenaiService } from './openai';


const logger = loggerFactory('growi:service:thread-deletion-cron');

const DELETE_LIMIT = 100;

class ThreadDeletionCronService {

cronJob: nodeCron.ScheduledTask;

openaiService: IOpenaiService;

startCron(): void {
const isAiEnabled = configManager.getConfig('crowi', 'app:aiEnabled');
if (!isAiEnabled) {
return;
}

const openaiService = getOpenaiService();
if (openaiService == null) {
throw new Error('OpenAI service is not initialized');
}

this.openaiService = openaiService;

// Executed at 0 minutes of every hour
const cronSchedule = '0 * * * *';

this.cronJob?.stop();
this.cronJob = this.generateCronJob(cronSchedule);
this.cronJob.start();
}

private async executeJob(): Promise<void> {
// Must be careful of OpenAI's rate limit
// Delete up to 100 threads per hour
await this.openaiService.deleteExpiredThreads(DELETE_LIMIT);
}

private generateCronJob(cronSchedule: string) {
return nodeCron.schedule(cronSchedule, async() => {
try {
await this.executeJob();
}
catch (e) {
logger.error(e);
}
});
}

}

export default ThreadDeletionCronService;
5 changes: 5 additions & 0 deletions apps/app/src/server/crowi/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import pkg from '^/package.json';

import { KeycloakUserGroupSyncService } from '~/features/external-user-group/server/service/keycloak-user-group-sync';
import { LdapUserGroupSyncService } from '~/features/external-user-group/server/service/ldap-user-group-sync';
import OpenaiThreadDeletionCronService from '~/features/openai/server/services/thread-deletion-cron';
import QuestionnaireService from '~/features/questionnaire/server/service/questionnaire';
import QuestionnaireCronService from '~/features/questionnaire/server/service/questionnaire-cron';
import loggerFactory from '~/utils/logger';
Expand Down Expand Up @@ -102,6 +103,7 @@ class Crowi {
this.commentService = null;
this.questionnaireService = null;
this.questionnaireCronService = null;
this.openaiThreadDeletionCronService = null;

this.tokens = null;

Expand Down Expand Up @@ -312,6 +314,9 @@ Crowi.prototype.setupSocketIoService = async function() {
Crowi.prototype.setupCron = function() {
this.questionnaireCronService = new QuestionnaireCronService(this);
this.questionnaireCronService.startCron();

this.openaiThreadDeletionCronService = new OpenaiThreadDeletionCronService();
this.openaiThreadDeletionCronService.startCron();
};

Crowi.prototype.setupQuestionnaireService = function() {
Expand Down
5 changes: 5 additions & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4687,6 +4687,11 @@
resolved "https://registry.yarnpkg.com/@types/ms/-/ms-0.7.31.tgz#31b7ca6407128a3d2bbc27fe2d21b345397f6197"
integrity sha512-iiUgKzV9AuaEkZqkOLDIvlQiL6ltuZd9tGcW3gwpnX8JbuiuhFlEGmmFXEXkN50Cvq7Os88IY2v0dkDqXYWVgA==

"@types/node-cron@^3.0.11":
version "3.0.11"
resolved "https://registry.yarnpkg.com/@types/node-cron/-/node-cron-3.0.11.tgz#70b7131f65038ae63cfe841354c8aba363632344"
integrity sha512-0ikrnug3/IyneSHqCBeslAhlK2aBfYek1fGo4bP4QnZPmiqSGRK+Oy7ZMisLWkesffJvQ1cqAcBnJC+8+nxIAg==

"@types/node-fetch@^2.5.0", "@types/node-fetch@^2.6.4":
version "2.6.11"
resolved "https://registry.yarnpkg.com/@types/node-fetch/-/node-fetch-2.6.11.tgz#9b39b78665dae0e82a08f02f4967d62c66f95d24"
Expand Down

0 comments on commit cc279f4

Please sign in to comment.