Skip to content

Commit

Permalink
feat: add StreamSpeed#getSpeed() to get current speed
Browse files Browse the repository at this point in the history
  • Loading branch information
fent committed Apr 11, 2020
1 parent 595d0ce commit 868f2e9
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 42 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@ Adds stream to group.
### StreamSpeed#remove(stream)
Removes stream from group.

### StreamSpeed#getSpeed()
Get current speed.

### StreamSpeed#getStreams()
Returns a list of all streams in the group.

### StreamSpeed#speed
Curent speed.

### StreamSpeed.toHuman(bytes, options)
Convenient method to convert `bytes` to a human readable string.
Helper method to convert `bytes` to a human readable string.

```js
StreamSpeed.toHuman(1500); // 1.46KB
Expand Down
27 changes: 16 additions & 11 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ module.exports = class StreamSpeed extends EventEmitter {
range: 1000,
}, options);
this._streams = [];
this.speed = 0;
}


Expand All @@ -31,20 +30,16 @@ module.exports = class StreamSpeed extends EventEmitter {
* @param {number} speed
*/
_update(meta, speed) {
meta.speed = speed;

this._streams.forEach((m) => {
// Skip own stream, streams that haven't started,
// and streams that are paused.
if (m === meta || m.speed === 0) {
// Skip own stream.
if (m === meta) {
return;
}

// Add other streams' speeds to total.
speed += m.speed;
speed += m.speedo.getSpeed();
});

this.speed = speed;
this.emit('speed', speed);
}

Expand Down Expand Up @@ -84,14 +79,15 @@ module.exports = class StreamSpeed extends EventEmitter {
this._streams.splice(this._streams.indexOf(meta), 1);
};

const reader = new Speedometer(this.options);
const stream = origstream.pipe(new PassThrough());

const meta = {
stream : origstream,
speed : 0,
speedo : reader,
cleanup,
};
this._streams.push(meta);
const reader = new Speedometer(this.options);
const stream = origstream.pipe(new PassThrough());
const onUpdate = this._update.bind(this, meta);

stream.on('readable', onReadable);
Expand All @@ -115,6 +111,15 @@ module.exports = class StreamSpeed extends EventEmitter {
}


/**
* Get current speed across all streams.
*/
getSpeed() {
return this._streams
.reduce((sum, meta) => sum + meta.speedo.getSpeed(), 0);
}


/**
* Converts bytes to human readable unit.
* Thank you Amir from StackOverflow.
Expand Down
41 changes: 24 additions & 17 deletions lib/speedometer.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ module.exports = class Speedometer {
constructor(options) {
this.options = options;
this.history = [];
this.speed = 0;
this._lastSpeed = 0;
}


Expand All @@ -23,32 +23,39 @@ module.exports = class Speedometer {
update(data, callback) {
const now = Date.now();

if (this.history.length) {
// Remove old data events.
let index = this.history.findIndex(item => {
return item.time > now - this.options.range;
});
this.history = index > -1 ? this.history.slice(index) : [];
}

this.history.push({
speed: data.length,
time: now,
});

let speed = this.getSpeed();
const change = this._lastSpeed !== speed;
this._lastSpeed = speed;

// Only emit event if there is a change in speed.
if (change) {
callback(speed);
}
}

/**
* Gets current speed.
*/
getSpeed() {
let now = Date.now();
if (this.history.length) {
// Remove old data events.
let index = this.history
.findIndex(item => item.time > now - this.options.range);
this.history = index > -1 ? this.history.slice(index) : [];
}

// Get total data emitted in `range` time period.
let totaldata = this.history.reduce((sum, point) => point.speed + sum, 0);

let speed = Math.round(
totaldata / this.options.range * this.options.timeUnit
);

const change = this.speed !== speed;
this.speed = speed;

// Only emit event if there is a change in speed.
if (change) {
callback(speed);
}
return speed;
}
};
1 change: 1 addition & 0 deletions test/group-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ describe('Create a group and write to it', () => {
s1.interval(100, 6, 200, { end: true });
s1.on('end', () => {
assert.equal(group.getStreams().length, 1);
assert.equal(group.getSpeed(), 0);
done();
});
});
Expand Down
20 changes: 10 additions & 10 deletions test/single-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,23 @@ const assertSpeed = (spy, speeds, startIndex = 0) => {

describe('Read from a stream', () => {
describe('with no `timeUnit`', () => {
it('Calculates constant speed', (done) => {
it('Calculates constant speed', async () => {
const ss = new StreamSpeed();
const rs = new MockStream();
ss.add(rs);
const spy = sinon.spy();
ss.on('speed', spy);

// Write at 4000 B/s.
rs.interval(200, 15, 200, { end: true });
// Write at 1000 B/s.
await rs.interval(200, 15, 200);

rs.on('end', () => {
// Ramps up to 1000
assertSpeed(spy, [200, 400, 600, 800, 1000]);
// Ramps up to 1000 B/s.
assertSpeed(spy, [200, 400, 600, 800, 1000]);

// After 5 calls, speed is constant.
assert.equal(spy.callCount, 5);
// After 5 calls, speed is constant.
assert.equal(spy.callCount, 5);
assert.equal(ss.getSpeed(), 1000);

done();
});
});
});

Expand Down Expand Up @@ -118,12 +116,14 @@ describe('With custom `range`', () => {

// Pause for a few secs.
await MockStream.timeout(10000);
assert.equal(ss.getSpeed(), 0, 'Speed should be 0 by now');

// Write at 1200 B/s.
await rs.interval(300, 8, 250);

// Speed changes quickly.
assertSpeed(spy, [300, 600, 900, 1200], 4);
assert.equal(ss.getSpeed(), 1200);
});
});
});

0 comments on commit 868f2e9

Please sign in to comment.