-
Notifications
You must be signed in to change notification settings - Fork 5
/
index.js
111 lines (95 loc) · 3.87 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
const fs = require("fs");
const path = require("path");
const stream = require("stream");
const _mergeFiles = (partitionIndex, partitionNames, combinationStream, callback) => {
if (partitionIndex == partitionNames.length) {
combinationStream.end();
return callback();
}
let partitionFileStream = fs.createReadStream(partitionNames[partitionIndex]);
partitionFileStream.on("data", (chunk) => combinationStream.write(chunk));
partitionFileStream.on("end", () => _mergeFiles(++partitionIndex, partitionNames, combinationStream, callback));
};
module.exports.mergeFilesToDisk = (partitionNames, outputPath, callback) => {
let combinationStream = fs.createWriteStream(outputPath);
_mergeFiles(0, partitionNames, combinationStream, callback);
};
module.exports.mergeFilesToStream = (partitionNames, callback) => {
let combinationStream = new stream.PassThrough();
_mergeFiles(0, partitionNames, combinationStream, () => { });
callback(combinationStream);
};
const _splitToStream = (outStreamCreate, fileStream, partitionStreamSize, callback) => {
const outStreams = [], { highWaterMark: defaultChunkSize, objectMode: isObjectMode } = fileStream._readableState;
let currentOutStream, currentFileSize = 0, fileStreamEnded = false, finishedWriteStreams = 0, openStream = false, partitionNum = 0, err = null;
const endCurrentWriteStream = () => {
currentOutStream.end();
currentOutStream = null;
currentFileSize = 0;
openStream = false;
};
const createNewWriteStream = () => {
currentOutStream = outStreamCreate(partitionNum);
currentOutStream.on("finish", writeStreamFinishHandler);
outStreams.push(currentOutStream);
partitionNum++;
}
const writeStreamFinishHandler = () => {
finishedWriteStreams++;
if (fileStreamEnded && partitionNum == finishedWriteStreams) {
callback(err, outStreams);
}
};
fileStream.on("readable", () => {
let chunk;
while (null !== (chunk = fileStream.read(Math.min(partitionStreamSize - currentFileSize, defaultChunkSize)))) {
if (openStream == false) {
createNewWriteStream();
openStream = true;
}
// A Readable stream in object mode will always return a single item from a call to readable.read(size), regardless of the value of the size argument.
const writeChunk = isObjectMode ? JSON.stringify(chunk) : chunk
if (writeChunk.length > partitionStreamSize) {
// In objectMode one object is read from the stream, it could be that the size is bigger than the partition size
err = new RangeError("Could not fit object into maxFileSize");
break;
}
if ((currentFileSize + writeChunk.length) > partitionStreamSize) {
endCurrentWriteStream();
createNewWriteStream();
}
currentOutStream.write(writeChunk);
currentFileSize += writeChunk.length;
if (currentFileSize == partitionStreamSize) {
endCurrentWriteStream();
}
}
});
fileStream.on("end", () => {
if (currentOutStream) {
endCurrentWriteStream();
}
fileStreamEnded = true;
});
};
const split = (fileStream, maxFileSize, rootFilePath, callback) =>
_split(fileStream, maxFileSize, (n) => `${rootFilePath}.split-${n}`, callback);
const getSplitWithGenFilePath = (generateFilePath) =>
(f, m, callback) => _split(f, m, generateFilePath, callback);
const _split = (fileStream, maxFileSize, generateFilePath, callback) => {
if (maxFileSize <= 0) {
throw new RangeError("maxFileSize must be greater than 0");
}
const partitionNames = [];
const outStreamCreate = (partitionNum) => {
let filePath = generateFilePath(partitionNum);
return fs.createWriteStream(filePath);
};
_splitToStream(outStreamCreate, fileStream, maxFileSize, (err, fileWriteStreams) => {
fileWriteStreams.forEach((fileWriteStream) => partitionNames.push(fileWriteStream["path"]));
callback(err, partitionNames);
});
};
module.exports.split = split;
module.exports.getSplitWithGenFilePath = getSplitWithGenFilePath;
module.exports._splitToStream = _splitToStream;