Skip to content

Commit

Permalink
ワーカーの構成を変更 / 期限切れのコンテキストの削除機構を追加
Browse files Browse the repository at this point in the history
  • Loading branch information
tissueMO committed Nov 3, 2024
1 parent 38cb759 commit 7c1881f
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 76 deletions.
37 changes: 34 additions & 3 deletions extensions/discord/addon/RecordAddon.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ dayjs.tz.setDefault('Asia/Tokyo');

const s3Client = new S3Client();

const DEFAULT_EXPIRES = 43200;

/**
* 任意のボイスチャンネルの文字起こしと要約を行います。
*/
Expand Down Expand Up @@ -148,6 +150,9 @@ class RecordAddon extends Addon {
return;
}

// 古いデータをクリア
await this.#clean(guild.id);

/** @type {VoiceChannel} 呼び出したユーザーが参加しているボイスチャンネル */
const channel = interaction.member.voice?.channel;

Expand Down Expand Up @@ -181,6 +186,7 @@ class RecordAddon extends Addon {
await Promise.resolve()
.then(() => this.#capture(connection, {
contextId: uuid(),
guildId: guild.id,
channelId: channel.id,
userId,
userName: guild.members.cache.get(userId).displayName,
Expand Down Expand Up @@ -323,11 +329,11 @@ class RecordAddon extends Addon {
* @returns {Promise<void>}
*/
async #enqueueConvertWorker(context) {
const { contextId, start } = context;
const { contextId, guildId, start } = context;
const workerPrefix = new ConvertWorker().prefix;

await this.#redisClient.multi()
.setEx(`${process.env.REDIS_NAMESPACE}:context:${contextId}`, 43200, JSON.stringify(context))
.setEx(`${process.env.REDIS_NAMESPACE}:context:${contextId}`, this.settings[guildId]?.expires ?? DEFAULT_EXPIRES, JSON.stringify(context))
.zAdd(`${process.env.REDIS_NAMESPACE}:contexts`, { score: dayjs(start).valueOf(), value: contextId })
.lPush(`${process.env.REDIS_NAMESPACE}:${workerPrefix}:queue`, contextId)
.exec();
Expand Down Expand Up @@ -403,10 +409,35 @@ class RecordAddon extends Addon {

const summary = data.choices[0]?.message?.content;

console.log(`[RecordAddon] トークン消費:`, data.usage);
console.log(`[RecordAddon] OpenAIトークン消費:`, data.usage);

return summary;
}

/**
* 期限切れのコンテキストを一括削除します。
* @param {string} guildId
* @returns {Promiose<void>}
*/
async #clean(guildId) {
// 対象取得
const limit = dayjs().tz().subtract(this.settings[guildId]?.expires ?? DEFAULT_EXPIRES, 'second');
const contextIds = await this.#redisClient.zRangeByScore(`${process.env.REDIS_NAMESPACE}:contexts`, 0, limit.valueOf());
if (!contextIds.length) {
console.info(`[RecordAddon] 有効期限切れのコンテキストはありません。`);
return;
}

// 一括削除
const multi = this.#redisClient.multi();
for (const contextId of contextIds) {
multi.del(`${process.env.REDIS_NAMESPACE}:context:${contextId}`);
}
multi.zRemRangeByScore(`${process.env.REDIS_NAMESPACE}:contexts`, 0, limit.valueOf());
await multi.exec();

console.info(`[RecordAddon] 有効期限切れのコンテキスト: ${contextIds.length}件 を削除しました。`);
}
}

module.exports = RecordAddon;
1 change: 1 addition & 0 deletions extensions/discord/config.example.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ module.exports = {
records: [
{
guildId: '111111111111111111',
expires: 43200,
},
],
hooks: [
Expand Down
62 changes: 29 additions & 33 deletions extensions/discord/worker/ConvertWorker.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,49 +31,45 @@ class ConvertWorker extends Worker {
}

/**
* PCM→MP3形式に変換します。
* @override
*/
async process() {
const ids = await this.dequeueAll();
async process(id) {
console.info(`<${this.prefix}> ID: ${id} の変換開始...`);

// PCM→MP3形式に変換
for (const id of ids) {
console.info(`<${this.prefix}> ID: ${id} の変換開始...`);
// 処理対象のデータを取得
const srcBaseName = `${id}.pcm`;
const srcFile = path.join(process.env.WORKER_PATH, srcBaseName);
const { Body: srcData } = await s3Client.send(new GetObjectCommand({
Bucket: process.env.S3_BUCKET,
Key: `${process.env.S3_PREFIX}${srcBaseName}`,
}));

// 処理対象のデータを取得
const srcBaseName = `${id}.pcm`;
const srcFile = path.join(process.env.WORKER_PATH, srcBaseName);
const { Body: srcData } = await s3Client.send(new GetObjectCommand({
Bucket: process.env.S3_BUCKET,
Key: `${process.env.S3_PREFIX}${srcBaseName}`,
}));
await pipeline(srcData, createWriteStream(srcFile));

await pipeline(srcData, createWriteStream(srcFile));
// 変換実行
const destBaseName = `${id}.mp3`;
const destFile = path.join(process.env.WORKER_PATH, destBaseName);
await this.#pcmToMp3(srcFile, destFile);

// 変換実行
const destBaseName = `${id}.mp3`;
const destFile = path.join(process.env.WORKER_PATH, destBaseName);
await this.#pcmToMp3(srcFile, destFile);
// 次のキューへ登録
await this.redisClient.lPush(`${process.env.REDIS_NAMESPACE}:${this.nextWorkerPrefix}:queue`, id);

// 次のキューへ登録
await this.redisClient.lPush(`${process.env.REDIS_NAMESPACE}:${this.nextWorkerPrefix}:queue`, id);
await s3Client.send(new PutObjectCommand({
Bucket: process.env.S3_BUCKET,
Key: `${process.env.S3_PREFIX}${destBaseName}`,
Body: createReadStream(destFile),
}));

await s3Client.send(new PutObjectCommand({
Bucket: process.env.S3_BUCKET,
Key: `${process.env.S3_PREFIX}${destBaseName}`,
Body: createReadStream(destFile),
}));
// 後片付け
await fs.unlink(srcFile);

// 後片付け
await fs.unlink(srcFile);
await s3Client.send(new DeleteObjectCommand({
Bucket: process.env.S3_BUCKET,
Key: `${process.env.S3_PREFIX}${srcBaseName}`,
}));

await s3Client.send(new DeleteObjectCommand({
Bucket: process.env.S3_BUCKET,
Key: `${process.env.S3_PREFIX}${srcBaseName}`,
}));

console.info(`<${this.prefix}> ID: ${id} の変換完了`);
}
console.info(`<${this.prefix}> ID: ${id} の変換完了`);
}

/**
Expand Down
60 changes: 28 additions & 32 deletions extensions/discord/worker/TranscribeWorker.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,49 +24,45 @@ class TranscribeWorker extends Worker {
}

/**
* 文字起こしを行います。
* @override
*/
async process() {
const ids = await this.dequeueAll();
async process(id) {
console.info(`<${this.prefix}> ID: ${id} の変換開始...`);

// 文字起こし
for (const id of ids) {
console.info(`<${this.prefix}> ID: ${id} の変換開始...`);
// 処理対象のデータを取得
const baseName = `${id}.mp3`;
const { Body: srcData } = await s3Client.send(new GetObjectCommand({
Bucket: process.env.S3_BUCKET,
Key: `${process.env.S3_PREFIX}${baseName}`,
}));

// 処理対象のデータを取得
const baseName = `${id}.mp3`;
const { Body: srcData } = await s3Client.send(new GetObjectCommand({
Bucket: process.env.S3_BUCKET,
Key: `${process.env.S3_PREFIX}${baseName}`,
}));
// 文字起こし実行
const requestData = new FormData();
requestData.append('file', srcData);

// 文字起こし実行
const requestData = new FormData();
requestData.append('file', srcData);
const { data: responseData } = await whisperClient.post('/transcribe', requestData, { headers: requestData.getHeaders() });

const { data: responseData } = await whisperClient.post('/transcribe', requestData, { headers: requestData.getHeaders() });
// 後片付け
await s3Client.send(new DeleteObjectCommand({
Bucket: process.env.S3_BUCKET,
Key: `${process.env.S3_PREFIX}${baseName}`,
}));

// 後片付け
await s3Client.send(new DeleteObjectCommand({
Bucket: process.env.S3_BUCKET,
Key: `${process.env.S3_PREFIX}${baseName}`,
}));
// コンテキストに文字起こし結果を反映
const context = await this.redisClient.get(`${process.env.REDIS_NAMESPACE}:context:${id}`)
.then(context => context ? JSON.parse(context) : null);

// コンテキストに文字起こし結果を反映
const context = await this.redisClient.get(`${process.env.REDIS_NAMESPACE}:context:${id}`)
.then(context => context ? JSON.parse(context) : null);

if (!context) {
console.error(`<${this.prefix}> ID: ${id} の変換失敗: 格納先コンテキストがありません。`);
continue;
}
if (!context) {
console.warn(`<${this.prefix}> ID: ${id} の変換失敗: 格納先コンテキストがありません。`);
return;
}

context['transcription'] = responseData['transcription'] ?? '(文字起こし失敗)';
context['transcription'] = responseData['transcription'] ?? '(文字起こし失敗)';

await this.redisClient.setEx(`${process.env.REDIS_NAMESPACE}:context:${id}`, 43200, JSON.stringify(context));
await this.redisClient.setEx(`${process.env.REDIS_NAMESPACE}:context:${id}`, 43200, JSON.stringify(context));

console.info(`<${this.prefix}> ID: ${id} の文字起こし完了`);
}
console.info(`<${this.prefix}> ID: ${id} の文字起こし完了`);
}
}

Expand Down
20 changes: 19 additions & 1 deletion extensions/discord/worker/Worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,26 @@ class Worker {

/**
* 1回分の処理を実行します。
* @param {string} id
*/
async process() {
async process(id) {
}

/**
* ワーカーキューにデータを登録します。
* @param {string[]} ids
* @returns {Promise<void>}
*/
async enqueue(ids) {
if (!ids.length) {
return;
}

const multi = worker.redisClient.multi();
for (const id of ids) {
multi.lPush(`${process.env.REDIS_NAMESPACE}:${this.prefix}:queue`, id);
}
await multi.exec();
}

/**
Expand Down
31 changes: 24 additions & 7 deletions extensions/discord/worker/WorkerManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,7 @@ class WorkerManager {
await worker.initialize();

while (true) {
try {
await worker.process();
} catch (err) {
console.error(`[ERROR] <${worker.prefix}>`, err);
}

await this.#process(worker);
await setTimeout(1000);
}
}));
Expand All @@ -45,9 +40,31 @@ class WorkerManager {
async once() {
await Promise.all(this.#workers.map(async worker => {
await worker.initialize();
await worker.process();
await this.#process(worker);
}));
}

/**
* ワーカーの処理を一度実行し、失敗した場合は復旧を試みます。
* @param {Worker} worker
*/
async #process(worker) {
const ids = await worker.dequeueAll();
const failedIds = [];

// 1件ずつ処理
for (const id of ids) {
try {
await worker.process(id);
} catch (err) {
console.error(`[ERROR] <${worker.prefix}>`, err);
failedIds.push(id);
}
}

// 失敗したIDをキューに戻す
await worker.enqueue(failedIds);
}
}

module.exports = WorkerManager;

0 comments on commit 7c1881f

Please sign in to comment.