Skip to content

Commit

Permalink
Implement out of order transform (#285)
Browse files Browse the repository at this point in the history
  • Loading branch information
mshima authored Apr 27, 2021
1 parent e262506 commit 8d65397
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 18 deletions.
8 changes: 5 additions & 3 deletions lib/environment.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ const YeomanRepository = require('./util/repository');
const Conflicter = require('./util/conflicter');
const {YeomanCommand} = require('./util/command');
const {
createConflicterCheckTransform,
createConflicterStatusTransform,
createEachFileTransform,
createModifiedTransform,
createYoRcTransform,
createYoResolveTransform
} = require('./util/transform');
Expand Down Expand Up @@ -1098,14 +1100,14 @@ class Environment extends Base {
}
return pipeline(
stream,
createEachFileTransform({forwardUmodified: false}),
createModifiedTransform(),
...transformStreams,
createEachFileTransform(file => {
if (log) {
log.completeWork(10);
npmlog.info('Completed', path.relative(this.logCwd, file.path));
}
}, {autoForward: false})
}, {autoForward: false, logName: 'environment:log'})
).then(() => {
if (log) {
log.finish();
Expand All @@ -1125,7 +1127,7 @@ class Environment extends Base {
const transformStreams = [
createYoResolveTransform(this.conflicter),
createYoRcTransform(),
createEachFileTransform(file => this.conflicter.checkForCollision(file)),
createConflicterCheckTransform(this.conflicter),
createConflicterStatusTransform()
];
this.fs.commit(transformStreams, stream, (error, value) => {
Expand Down
65 changes: 65 additions & 0 deletions lib/util/out-of-order-transform.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
const debug = require('debug');
const {default: PQueue} = require('p-queue');
const {Transform} = require('stream');

module.exports = class OOOTransform extends Transform {
/**
* @private
* Out Of Order Transform
*/
constructor(options) {
// Out of order only makes sense for objectMode.
// transform is used locally, forward undefined to prevent conflicts.
super({...options, objectMode: true, transform: undefined});

this.logName = options.logName || Math.random().toString(36).slice(7);
this.queue = new PQueue();
this.oooTransform = options.transform;

this.debug = debug(`ooo-transform:${this.logName}`);
this.debug('New Transform');

if (this.debug.enabled) {
this.on('end', () => this.debug('event:end'));
this.on('finish', () => this.debug('event:finish'));
this.on('drain', () => this.debug('event:drain'));
this.on('close', () => this.debug('event:close'));
this.on('unpipe', () => this.debug('event:unpipe'));
this.on('unpipe', () => this.debug('event:unpipe'));
this.queue.on('add', () => this.debug('++ task: queue size %d, pending %d', this.queue.size, this.queue.pending));
this.queue.on('next', () => this.debug('-- task: queue size %d, pending %d', this.queue.size, this.queue.pending));
}
}

_final(cb) {
this.finalized = true;
this.debug('_flush');
this.queue.onIdle().then(() => cb());
}

_executeTransform(chunk, enc) {
return Promise.resolve(this.oooTransform(chunk, enc, (error, chunk) => {
if (error) {
this.destroy(error);
} else if (chunk) {
this.push(chunk);
}
})).catch(error => this.destroy(error));
}

_transform(chunk, enc, cb) {
if (this.finalized) {
cb(new Error('Transform already finalized'));
return;
}
this.debug('_transform %s', chunk.path);
this.queue.add(() => this._executeTransform(chunk, enc));

setTimeout(() => cb());
}

_destroy(error, cb) {
this.debug('_destroy %s', error);
this.queue.onIdle().then(() => cb(error));
}
};
35 changes: 20 additions & 15 deletions lib/util/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,20 @@ const fs = require('fs');
const path = require('path');
const findUp = require('find-up');
const minimatch = require('minimatch');
const {Transform} = require('stream');
const OOOTransform = require('./out-of-order-transform');

class YoResolveError extends WError {}

function createFileTransform(
transform = (file, _enc, cb) => cb(null, file),
options = {}
) {
const stream = new Transform({
objectMode: true,
return new OOOTransform({
...options,
transform(...args) {
return transform.apply(this, args);
return transform.call(this, ...args);
}
});
return stream;
}

/**
Expand Down Expand Up @@ -48,25 +46,22 @@ function createEachFileTransform(forEach, options = {}) {
options = forEach;
forEach = () => {};
}
const {forwardUmodified = true, executeUnmodified = false, autoForward = true, autoContinue = true} = options;
const {forwardUmodified = true, executeUnmodified = false, autoForward = true} = options;
return createFileTransform(function (file, enc, cb) {
const forward = () => {
if (autoForward && (forwardUmodified || fileIsModified(file))) {
this.push(file);
}
if (autoContinue) {
cb();
}
};
if (!executeUnmodified && !fileIsModified(file)) {
forward();
return;
}
Promise
.resolve(forEach.call(this, file, enc, autoContinue ? undefined : cb))
return Promise
.resolve(forEach.call(this, file, enc))
.then(() => forward())
.catch(error => cb(error));
});
}, options);
}

function parseYoAttributesFile(yoAttributeFileName) {
Expand All @@ -88,6 +83,10 @@ function parseYoAttributesFile(yoAttributeFileName) {
);
}

function createConflicterCheckTransform(conflicter) {
return createEachFileTransform(file => conflicter.checkForCollision(file), {logName: 'environment:conflicter-check'});
}

function getConflicterStatusForFile(conflicter, filePath, yoAttributeFileName = '.yo-resolve') {
const fileDir = path.dirname(filePath);
conflicter.yoResolveByFile = conflicter.yoResolveByFile || {};
Expand Down Expand Up @@ -131,7 +130,7 @@ function getConflicterStatusForFile(conflicter, filePath, yoAttributeFileName =
function createYoResolveTransform(conflicter, yoResolveFileName) {
return createEachFileTransform(file => {
file.conflicter = file.conflicter || getConflicterStatusForFile(conflicter, file.path, yoResolveFileName);
});
}, {logName: 'environment:yo-resolve'});
}

/**
Expand All @@ -145,7 +144,7 @@ function createYoRcTransform() {
if (filename === '.yo-rc.json' || filename === '.yo-rc-global.json') {
file.conflicter = 'force';
}
});
}, {logName: 'environment:yo-rc'});
}

/**
Expand Down Expand Up @@ -173,7 +172,11 @@ function createConflicterStatusTransform() {
} else {
this.push(file);
}
}, {autoForward: false});
}, {autoForward: false, logName: 'environment:conflicter-status'});
}

function createModifiedTransform() {
return createEachFileTransform({forwardUmodified: false, logName: 'environment:modified'});
}

module.exports = {
Expand All @@ -183,5 +186,7 @@ module.exports = {
createEachFileTransform,
createYoResolveTransform,
createYoRcTransform,
createModifiedTransform,
createConflicterCheckTransform,
createConflicterStatusTransform
};
54 changes: 54 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
"mem-fs-editor": "^8.1.2",
"minimatch": "^3.0.4",
"npmlog": "^4.1.2",
"p-queue": "^6.6.2",
"pacote": "^11.2.6",
"preferred-pm": "^3.0.3",
"pretty-bytes": "^5.3.0",
Expand Down

0 comments on commit 8d65397

Please sign in to comment.