Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

imprv: Use stream.pipeline #9361

Merged
merged 25 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion apps/app/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,6 @@
"remark-stringify": "^11.0.0",
"sanitize-filename": "^1.6.3",
"socket.io": "^4.7.5",
"stream-to-promise": "^3.0.0",
"string-width": "=4.2.2",
"superjson": "^1.9.1",
"swagger-jsdoc": "^6.2.8",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ describe('Installing a GROWI template plugin', () => {
it('install() should success', async() => {
// when
const result = await growiPluginService.install({
url: 'https://github.com/weseek/growi-plugin-templates-for-office',
url: 'https://github.com/growilabs/growi-plugin-templates-for-office',
});
const count = await GrowiPlugin.count({ 'meta.name': 'growi-plugin-templates-for-office' });

Expand All @@ -20,7 +20,7 @@ describe('Installing a GROWI template plugin', () => {
expect(count).toBe(1);
expect(fs.existsSync(path.join(
PLUGIN_STORING_PATH,
'weseek',
'growilabs',
'growi-plugin-templates-for-office',
))).toBeTruthy();
});
Expand All @@ -33,7 +33,7 @@ describe('Installing a GROWI template plugin', () => {
// setup
const dummyFilePath = path.join(
PLUGIN_STORING_PATH,
'weseek',
'growilabs',
'growi-plugin-templates-for-office',
'dummy.txt',
);
Expand All @@ -42,7 +42,7 @@ describe('Installing a GROWI template plugin', () => {

// when
const result = await growiPluginService.install({
url: 'https://github.com/weseek/growi-plugin-templates-for-office',
url: 'https://github.com/growilabs/growi-plugin-templates-for-office',
});
const count2 = await GrowiPlugin.count({ 'meta.name': 'growi-plugin-templates-for-office' });

Expand All @@ -59,7 +59,7 @@ describe('Installing a GROWI theme plugin', () => {
it('install() should success', async() => {
// when
const result = await growiPluginService.install({
url: 'https://github.com/weseek/growi-plugin-theme-vivid-internet',
url: 'https://github.com/growilabs/growi-plugin-theme-vivid-internet',
});
const count = await GrowiPlugin.count({ 'meta.name': 'growi-plugin-theme-vivid-internet' });

Expand All @@ -68,7 +68,7 @@ describe('Installing a GROWI theme plugin', () => {
expect(count).toBe(1);
expect(fs.existsSync(path.join(
PLUGIN_STORING_PATH,
'weseek',
'growilabs',
'growi-plugin-theme-vivid-internet',
))).toBeTruthy();
});
Expand All @@ -88,7 +88,7 @@ describe('Installing a GROWI theme plugin', () => {
expect(results.themeMetadata).not.toBeNull();
expect(results.themeHref).not.toBeNull();
expect(results.themeHref
.startsWith('/static/plugins/weseek/growi-plugin-theme-vivid-internet/dist/assets/style-')).toBeTruthy();
.startsWith('/static/plugins/growilabs/growi-plugin-theme-vivid-internet/dist/assets/style-')).toBeTruthy();
});

});
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import fs, { readFileSync } from 'fs';
import path from 'path';
import { pipeline } from 'stream/promises';

import { GrowiPluginType } from '@growi/core';
import type { GrowiThemeMetadata, ViteManifest } from '@growi/core';
Expand All @@ -8,7 +9,6 @@ import { importPackageJson, validateGrowiDirective } from '@growi/pluginkit/dist
// eslint-disable-next-line no-restricted-imports
import axios from 'axios';
import type mongoose from 'mongoose';
import streamToPromise from 'stream-to-promise';
import unzipStream from 'unzip-stream';

import loggerFactory from '~/utils/logger';
Expand Down Expand Up @@ -209,10 +209,8 @@ export class GrowiPluginService implements IGrowiPluginService {

private async unzip(zipFilePath: fs.PathLike, destPath: fs.PathLike): Promise<void> {
try {
const stream = fs.createReadStream(zipFilePath);
const unzipFileStream = stream.pipe(unzipStream.Extract({ path: destPath.toString() }));

await streamToPromise(unzipFileStream);
const readZipStream = fs.createReadStream(zipFilePath);
Fixed Show fixed Hide fixed
Fixed Show fixed Hide fixed
Fixed Show fixed Hide fixed
Dismissed Show dismissed Hide dismissed
await pipeline(readZipStream, unzipStream.Extract({ path: destPath.toString() }));
}
catch (err) {
logger.error(err);
Expand Down
5 changes: 2 additions & 3 deletions apps/app/src/features/openai/server/services/openai.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import assert from 'node:assert';
import { Readable, Transform } from 'stream';
import { pipeline } from 'stream/promises';

import { PageGrant, isPopulated } from '@growi/core';
import type { HydratedDocument, Types } from 'mongoose';
Expand Down Expand Up @@ -342,9 +343,7 @@ class OpenaiService implements IOpenaiService {
},
});

pagesStream
.pipe(batchStrem)
.pipe(createVectorStoreFileStream);
await pipeline(pagesStream, batchStrem, createVectorStoreFileStream);
}

async rebuildVectorStore(page: HydratedDocument<PageDocument>) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Writable } from 'stream';
import { pipeline } from 'stream/promises';

import mongoose from 'mongoose';
import streamToPromise from 'stream-to-promise';

import getPageModel from '~/server/models/page';
import { Revision } from '~/server/models/revision';
Expand Down Expand Up @@ -56,11 +56,7 @@ module.exports = {
},
});

pagesStream
.pipe(batchStrem)
.pipe(migratePagesStream);

await streamToPromise(migratePagesStream);
await pipeline(pagesStream, batchStrem, migratePagesStream);

logger.info('Migration has successfully applied');
},
Expand Down Expand Up @@ -107,11 +103,7 @@ module.exports = {
},
});

pagesStream
.pipe(batchStrem)
.pipe(migratePagesStream);

await streamToPromise(migratePagesStream);
await pipeline(pagesStream, batchStrem, migratePagesStream);

logger.info('Migration down has successfully applied');
},
Expand Down
5 changes: 3 additions & 2 deletions apps/app/src/server/routes/apiv3/page/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import path from 'path';
import { pipeline, type Readable } from 'stream';

import type { IPage } from '@growi/core';
import {
Expand Down Expand Up @@ -735,7 +736,7 @@ module.exports = (crowi) => {
fileName = '_top';
}

let stream;
let stream: Readable;

try {
stream = exportService.getReadStreamFromRevision(revision, format);
Expand All @@ -760,7 +761,7 @@ module.exports = (crowi) => {
};
await crowi.activityService.createActivity(parameters);

return stream.pipe(res);
return pipeline(stream, res);
});

/**
Expand Down
16 changes: 5 additions & 11 deletions apps/app/src/server/service/export.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ const logger = loggerFactory('growi:services:ExportService'); // eslint-disable-
const fs = require('fs');
const path = require('path');
const { Transform } = require('stream');
const { pipeline } = require('stream/promises');

const archiver = require('archiver');
const mongoose = require('mongoose');
const streamToPromise = require('stream-to-promise');

const CollectionProgressingStatus = require('../models/vo/collection-progressing-status');

Expand Down Expand Up @@ -107,7 +107,7 @@ class ExportService {
writeStream.write(JSON.stringify(metaData));
writeStream.close();

await streamToPromise(writeStream);
await pipeline([writeStream]);

return metaJson;
}
Expand Down Expand Up @@ -196,12 +196,7 @@ class ExportService {
const jsonFileToWrite = path.join(this.baseDir, `${collectionName}.json`);
const writeStream = fs.createWriteStream(jsonFileToWrite, { encoding: this.growiBridgeService.getEncoding() });

readStream
.pipe(logStream)
.pipe(transformStream)
.pipe(writeStream);

await streamToPromise(writeStream);
await pipeline(readStream, logStream, transformStream, writeStream);

return writeStream.path;
}
Expand Down Expand Up @@ -355,13 +350,12 @@ class ExportService {
const output = fs.createWriteStream(zipFile);

// pipe archive data to the file
archive.pipe(output);
const stream = pipeline(archive, output);

// finalize the archive (ie we are done appending files but streams have to finish yet)
// 'close', 'end' or 'finish' may be fired right after calling this method so register to them beforehand
archive.finalize();

await streamToPromise(archive);
await stream;

logger.info(`zipped GROWI data into ${zipFile} (${archive.pointer()} bytes)`);

Expand Down
12 changes: 7 additions & 5 deletions apps/app/src/server/service/file-uploader/local.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import type { ReadStream } from 'fs';
import type { Writable } from 'stream';
import { Readable } from 'stream';
import { pipeline } from 'stream/promises';

import type { Response } from 'express';

Expand All @@ -24,7 +26,6 @@ const fsPromises = require('fs/promises');
const path = require('path');

const mkdir = require('mkdirp');
const streamToPromise = require('stream-to-promise');
const urljoin = require('url-join');


Expand Down Expand Up @@ -163,8 +164,9 @@ module.exports = function(crowi) {
// mkdir -p
mkdir.sync(dirpath);

const stream = fileStream.pipe(fs.createWriteStream(filePath));
return streamToPromise(stream);
const writeStream: Writable = fs.createWriteStream(filePath);

return pipeline(fileStream, writeStream);
};

lib.saveFile = async function({ filePath, contentType, data }) {
Expand All @@ -177,8 +179,8 @@ module.exports = function(crowi) {
const fileStream = new Readable();
fileStream.push(data);
fileStream.push(null); // EOF
const stream = fileStream.pipe(fs.createWriteStream(absFilePath));
return streamToPromise(stream);
const writeStream: Writable = fs.createWriteStream(absFilePath);
return pipeline(fileStream, writeStream);
};

/**
Expand Down
11 changes: 7 additions & 4 deletions apps/app/src/server/service/growi-bridge/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import fs from 'fs';
import path from 'path';
import { pipeline } from 'stream';
import { pipeline as pipelinePromise } from 'stream/promises';

import streamToPromise from 'stream-to-promise';
import unzipStream, { type Entry } from 'unzip-stream';

import loggerFactory from '~/utils/logger';
Expand Down Expand Up @@ -78,10 +79,12 @@ class GrowiBridgeService {
let meta = {};

const readStream = fs.createReadStream(zipFile);
const unzipStreamPipe = readStream.pipe(unzipStream.Parse());
const parseStream = unzipStream.Parse();
const unzipEntryStream = pipeline(readStream, parseStream);

let tapPromise;

const unzipEntryStream = unzipStreamPipe.on('entry', (entry: Entry) => {
unzipEntryStream.on('entry', (entry: Entry) => {
const fileName = entry.path;
const size = entry.size; // might be undefined in some archives
if (fileName === this.getMetaFileName()) {
Expand All @@ -100,7 +103,7 @@ class GrowiBridgeService {
});

try {
await streamToPromise(unzipEntryStream);
await pipelinePromise([unzipEntryStream]);
await tapPromise;
}
// if zip is broken
Expand Down
21 changes: 8 additions & 13 deletions apps/app/src/server/service/import/import.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import fs from 'fs';
import path from 'path';
import type { EventEmitter } from 'stream';
import { Writable, Transform } from 'stream';
import { Writable, Transform, pipeline } from 'stream';
import { pipeline as pipelinePromise } from 'stream/promises';

import JSONStream from 'JSONStream';
import gc from 'expose-gc/function';
Expand All @@ -10,7 +11,6 @@ import type {
} from 'mongodb';
import type { Document } from 'mongoose';
import mongoose from 'mongoose';
import streamToPromise from 'stream-to-promise';
import unzipStream from 'unzip-stream';

import { ImportMode } from '~/models/admin/import-mode';
Expand Down Expand Up @@ -267,13 +267,7 @@ export class ImportService {
},
});

readStream
.pipe(jsonStream)
.pipe(convertStream)
.pipe(batchStream)
.pipe(writeStream);

await streamToPromise(writeStream);
await pipelinePromise(readStream, jsonStream, convertStream, batchStream, writeStream);

// clean up tmp directory
fs.unlinkSync(jsonFile);
Expand Down Expand Up @@ -349,10 +343,11 @@ export class ImportService {
*/
async unzip(zipFile) {
const readStream = fs.createReadStream(zipFile);
const unzipStreamPipe = readStream.pipe(unzipStream.Parse());
const parseStream = unzipStream.Parse();
const unzipStreamPipe = pipeline(readStream, parseStream);
const files: string[] = [];

unzipStreamPipe.on('entry', (/** @type {Entry} */ entry) => {
const unzipEntryStream = unzipStreamPipe.on('entry', (/** @type {Entry} */ entry) => {
const fileName = entry.path;
// https://regex101.com/r/mD4eZs/6
// prevent from unexpecting attack doing unzip file (path traversal attack)
Expand All @@ -370,12 +365,12 @@ export class ImportService {
else {
const jsonFile = path.join(this.baseDir, fileName);
const writeStream = fs.createWriteStream(jsonFile, { encoding: this.growiBridgeService.getEncoding() });
entry.pipe(writeStream);
pipeline(entry, writeStream);
files.push(jsonFile);
}
});

await streamToPromise(unzipStreamPipe);
await pipelinePromise([unzipEntryStream]);

return files;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { Writable } from 'stream';
import { pipeline } from 'stream/promises';

import { getIdForRef } from '@growi/core';
import type { IPage, Ref } from '@growi/core';
import { isUsersHomepage } from '@growi/core/dist/utils/page-path-utils';
import type { HydratedDocument } from 'mongoose';
import mongoose from 'mongoose';
import streamToPromise from 'stream-to-promise';

import type { PageDocument, PageModel } from '~/server/models/page';
import { createBatchStream } from '~/server/util/batch-stream';
Expand Down Expand Up @@ -87,8 +87,9 @@ export const deleteCompletelyUserHomeBySystem = async(userHomepagePath: string,
.lean()
.cursor({ batchSize: BULK_REINDEX_SIZE });

let count = 0;
const batchStream = createBatchStream(BULK_REINDEX_SIZE);

let count = 0;
const writeStream = new Writable({
objectMode: true,
async write(batch, encoding, callback) {
Expand All @@ -109,11 +110,7 @@ export const deleteCompletelyUserHomeBySystem = async(userHomepagePath: string,
},
});

readStream
.pipe(createBatchStream(BULK_REINDEX_SIZE))
.pipe(writeStream);

await streamToPromise(writeStream);
await pipeline(readStream, batchStream, writeStream);
// ────────┤ end │─────────
}
catch (err) {
Expand Down
Loading
Loading