diff --git a/package-lock.json b/package-lock.json index be7da55..2b0067c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -5,6 +5,7 @@ "requires": true, "packages": { "": { + "name": "tgcalls", "version": "1.0.1", "license": "LGPL-3.0", "dependencies": { @@ -587,9 +588,9 @@ } }, "node_modules/path-parse": { - "version": "1.0.6", - "resolved": "https://registry.npmjs.org/path-parse/-/path-parse-1.0.6.tgz", - "integrity": "sha512-GSmOT2EbHrINBf9SR7CDELwlJ8AENk3Qn7OikK4nFYAu3Ote2+JYNVvkpAEQm3/TLNEJFD/xZJjzyxg3KBWOzw==", + "version": "1.0.7", + "resolved": "https://registry.npmjs.org/path-parse/-/path-parse-1.0.7.tgz", + "integrity": "sha512-LDJzPVEEEPR+y48z93A0Ed0yXb8pAByGWo/k5YYdYgpY2/2EsOsksJrq7lOHxryrVOn1ejG6oAp8ahvOIQD8sw==", "dev": true }, "node_modules/process-nextick-args": { @@ -780,22 +781,41 @@ } }, "node_modules/tar": { - "version": "4.4.13", - "resolved": "https://registry.npmjs.org/tar/-/tar-4.4.13.tgz", - "integrity": "sha512-w2VwSrBoHa5BsSyH+KxEqeQBAllHhccyMFVHtGtdMpF4W7IRWfZjFiQceJPChOeTsSDVUpER2T8FA93pr0L+QA==", + "version": "4.4.19", + "resolved": "https://registry.npmjs.org/tar/-/tar-4.4.19.tgz", + "integrity": "sha512-a20gEsvHnWe0ygBY8JbxoM4w3SJdhc7ZAuxkLqh+nvNQN2IOt0B5lLgM490X5Hl8FF0dl0tOf2ewFYAlIFgzVA==", "dependencies": { - "chownr": "^1.1.1", - "fs-minipass": "^1.2.5", - "minipass": "^2.8.6", - "minizlib": "^1.2.1", - "mkdirp": "^0.5.0", - "safe-buffer": "^5.1.2", - "yallist": "^3.0.3" + "chownr": "^1.1.4", + "fs-minipass": "^1.2.7", + "minipass": "^2.9.0", + "minizlib": "^1.3.3", + "mkdirp": "^0.5.5", + "safe-buffer": "^5.2.1", + "yallist": "^3.1.1" }, "engines": { "node": ">=4.5" } }, + "node_modules/tar/node_modules/safe-buffer": { + "version": "5.2.1", + "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.2.1.tgz", + "integrity": "sha512-rp3So07KcdmmKbGvgaNxQSJr7bGVSVk5S9Eq1F+ppbRo70+YeaDxkw5Dd8NPN+GD6bjnYm2VuPuCXmpuYvmCXQ==", + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/feross" + }, + { + "type": "patreon", + "url": "https://www.patreon.com/feross" + }, + { + "type": "consulting", + "url": "https://feross.org/support" + } + ] + }, "node_modules/typedoc": { "version": "0.20.33", "resolved": "https://registry.npmjs.org/typedoc/-/typedoc-0.20.33.tgz", @@ -1405,9 +1425,9 @@ "integrity": "sha1-F0uSaHNVNP+8es5r9TpanhtcX18=" }, "path-parse": { - "version": "1.0.6", - "resolved": "https://registry.npmjs.org/path-parse/-/path-parse-1.0.6.tgz", - "integrity": "sha512-GSmOT2EbHrINBf9SR7CDELwlJ8AENk3Qn7OikK4nFYAu3Ote2+JYNVvkpAEQm3/TLNEJFD/xZJjzyxg3KBWOzw==", + "version": "1.0.7", + "resolved": "https://registry.npmjs.org/path-parse/-/path-parse-1.0.7.tgz", + "integrity": "sha512-LDJzPVEEEPR+y48z93A0Ed0yXb8pAByGWo/k5YYdYgpY2/2EsOsksJrq7lOHxryrVOn1ejG6oAp8ahvOIQD8sw==", "dev": true }, "process-nextick-args": { @@ -1562,17 +1582,24 @@ "integrity": "sha1-PFMZQukIwml8DsNEhYwobHygpgo=" }, "tar": { - "version": "4.4.13", - "resolved": "https://registry.npmjs.org/tar/-/tar-4.4.13.tgz", - "integrity": "sha512-w2VwSrBoHa5BsSyH+KxEqeQBAllHhccyMFVHtGtdMpF4W7IRWfZjFiQceJPChOeTsSDVUpER2T8FA93pr0L+QA==", + "version": "4.4.19", + "resolved": "https://registry.npmjs.org/tar/-/tar-4.4.19.tgz", + "integrity": "sha512-a20gEsvHnWe0ygBY8JbxoM4w3SJdhc7ZAuxkLqh+nvNQN2IOt0B5lLgM490X5Hl8FF0dl0tOf2ewFYAlIFgzVA==", "requires": { - "chownr": "^1.1.1", - "fs-minipass": "^1.2.5", - "minipass": "^2.8.6", - "minizlib": "^1.2.1", - "mkdirp": "^0.5.0", - "safe-buffer": "^5.1.2", - "yallist": "^3.0.3" + "chownr": "^1.1.4", + "fs-minipass": "^1.2.7", + "minipass": "^2.9.0", + "minizlib": "^1.3.3", + "mkdirp": "^0.5.5", + "safe-buffer": "^5.2.1", + "yallist": "^3.1.1" + }, + "dependencies": { + "safe-buffer": { + "version": "5.2.1", + "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.2.1.tgz", + "integrity": "sha512-rp3So07KcdmmKbGvgaNxQSJr7bGVSVk5S9Eq1F+ppbRo70+YeaDxkw5Dd8NPN+GD6bjnYm2VuPuCXmpuYvmCXQ==" + } } }, "typedoc": { diff --git a/src/stream.ts b/src/stream.ts index 487290f..c82f1bc 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -1,7 +1,11 @@ import { EventEmitter } from 'events'; import { Readable } from 'stream'; import { RTCVideoSource, RTCAudioSource, nonstandard } from 'wrtc'; -import { StreamOptions } from './types'; +import { + RemotePlayingTimeCallback, + StreamOptions, + RemoteLaggingCallback, +} from './types'; export declare interface Stream { on(event: 'pause', listener: (paused: boolean) => void): this; @@ -15,13 +19,14 @@ export class Stream extends EventEmitter { private readonly audioSource: RTCAudioSource; private readonly videoSource: RTCVideoSource; private readable?: Readable; - private cache: Buffer; + private cache: Buffer[]; + private _paused = false; private _finished = true; private _stopped = false; private _finishedLoading = false; private _emittedAlmostFinished = false; - + private lastDifference = 0; readonly video: boolean = false; public width: number = 640; public height: number = 360; @@ -30,6 +35,14 @@ export class Stream extends EventEmitter { readonly sampleRate: number; readonly channelCount: number; private almostFinishedTrigger: number; + private byteLength: number; + private cacheSize: number = 0; + private playedBytes = 0; + private chunk: Buffer; + private _readablePaused = false; + + remoteTime?: RemotePlayingTimeCallback; + remoteLagging?: RemoteLaggingCallback; constructor(readable?: Readable, options?: StreamOptions) { super(); @@ -50,12 +63,22 @@ export class Stream extends EventEmitter { this.audioSource = new nonstandard.RTCAudioSource(); this.videoSource = new nonstandard.RTCVideoSource(); - this.cache = Buffer.alloc(0); + this.byteLength = this.video + ? 1.5 * this.width * this.height + : ((this.sampleRate * this.bitsPerSample) / 8 / 100) * + this.channelCount; + this.cache = []; + + this.chunk = Buffer.alloc(this.byteLength); this.setReadable(readable); this.processData(); } + private requiredTime() { + return this.video ? 0.5 : 50; + } + setReadable(readable?: Readable) { if (this._stopped) { throw new Error('Cannot set readable when stopped'); @@ -66,14 +89,15 @@ export class Stream extends EventEmitter { this.readable.removeListener('end', this.endListener); } - this.cache = Buffer.alloc(0); - if (readable) { this._finished = false; this._finishedLoading = false; this._emittedAlmostFinished = false; this.readable = readable; - + this.cache.splice(0, this.cache.length); + this.cacheSize = 0; + this.playedBytes = 0; + this.chunk = Buffer.alloc(this.byteLength); this.readable.addListener('data', this.dataListener); this.readable.addListener('end', this.endListener); } @@ -116,72 +140,171 @@ export class Stream extends EventEmitter { : this.audioSource.createTrack(); } - private dataListener = ((data: any) => { - this.cache = Buffer.concat([this.cache, data]); + private dataListener = ((data: Buffer) => { + this.cache.push(data); + this.cacheSize += data.length; }).bind(this); private endListener = (() => { this._finishedLoading = true; }).bind(this); - private processData() { + private remoteIsLagging() { + if ( + this.remoteTime !== undefined && + !this.paused && + this.remoteLagging !== undefined + ) { + const time = this.time(); + const remoteTime = this.remoteTime().time; + + if (time !== undefined && remoteTime !== undefined) { + if (time > remoteTime) { + this.lastDifference = (time - remoteTime) * 100000; + return true; + } else if (this.remoteLagging().lagging && remoteTime > time) { + this.lastDifference = 0; + return true; + } + } + } + + return false; + } + + private processData = () => { if (this._stopped) { return; } - const byteLength = this.video - ? 1.5 * this.width * this.height - : ((this.sampleRate * this.bitsPerSample) / 8 / 100) * - this.channelCount; + const timeout = this.frameTime() - this.lastDifference; + + this.checkOverflow(); + + setTimeout(() => this.processData(), timeout); if ( !this._paused && !this._finished && - (this.cache.length >= byteLength || this._finishedLoading) + !this.remoteIsLagging() && + (this.cacheSize > this.byteLength || this._finishedLoading) ) { - const buffer = this.cache.slice(0, byteLength); - this.cache = this.cache.slice(byteLength); - - try { - if (this.video) { - this.videoSource.onFrame({ - data: new Uint8ClampedArray(buffer), - width: this.width, - height: this.height, - }); + let chunkSize = 0; + + this.cache.find((chunk, i) => { + if (chunkSize === 0) { + chunk.copy(this.chunk, 0, 0, this.byteLength); + chunkSize = Math.min(chunk.length, this.byteLength); + this.cache[i] = chunk.slice(this.byteLength); } else { - const samples = new Int16Array( - new Uint8Array(buffer).buffer, - ); - this.audioSource.onData({ - bitsPerSample: this.bitsPerSample, - sampleRate: this.sampleRate, - channelCount: this.channelCount, - numberOfFrames: samples.length, - samples, - }); + const req = this.byteLength - chunkSize; + const tmpChunk = + req < chunk.length ? chunk.slice(0, req) : chunk; + tmpChunk.copy(this.chunk, chunkSize); + chunkSize += tmpChunk.length; + this.cache[i] = chunk.slice(req); } - } catch (error) { - this.emit('error', error); - } + + return chunkSize >= this.byteLength; + }); + + this.cacheSize -= this.byteLength; + // remove empty buffers + this.cache.splice( + 0, + this.cache.findIndex(i => i.length), + ); + this.playedBytes += this.byteLength; + this.broadcast(); } if (!this._finished && this._finishedLoading) { if ( !this._emittedAlmostFinished && - this.cache.length < - byteLength + this.almostFinishedTrigger * this.sampleRate + this.cacheSize < + this.byteLength + + this.almostFinishedTrigger * this.sampleRate ) { this._emittedAlmostFinished = true; this.emit('almost-finished'); - } else if (this.cache.length < byteLength) { + } else if (this.cacheSize < this.byteLength) { this.finish(); } } + }; + + private checkOverflow() { + const frameTime = this.video ? this.framerate : 100; + const cachedTime = this.cacheSize / this.byteLength / frameTime; + const neededTime = this.video ? 5 : 60; + if (cachedTime > neededTime) { + if (!this._readablePaused) { + this.readable!.pause(); + this._readablePaused = true; + } + } else if (cachedTime < neededTime / 2 && this._readablePaused) { + this.readable!.resume(); + this._readablePaused = false; + } + } + + public isLagging() { + if (this._finishedLoading) { + return false; + } - setTimeout( - () => this.processData(), - this.video ? 1000 / this.framerate : 10, - ); + return this.cacheSize < this.byteLength * this.requiredTime(); + } + + private frameTime(): number { + return this.finished || + this.paused || + this.isLagging() || + this.readable === undefined + ? 500 + : this.video + ? 1000 / this.framerate + : 10; + } + + private broadcast() { + if (this.cacheSize < this.byteLength) { + return; + } + + try { + if (this.video) { + this.videoSource.onFrame({ + data: new Uint8ClampedArray(this.chunk), + width: this.width, + height: this.height, + }); + } else { + const samples = new Int16Array( + new Uint8Array(this.chunk).buffer, + ); + this.audioSource.onData({ + bitsPerSample: this.bitsPerSample, + sampleRate: this.sampleRate, + channelCount: this.channelCount, + numberOfFrames: samples.length, + samples, + }); + } + } catch (error) { + this.emit('error', error); + } + } + + time(): number | undefined { + if (this.readable === undefined || this.finished) { + return undefined; + } else { + return Math.ceil( + this.playedBytes / + this.byteLength / + (0.00001 / this.frameTime()), + ); + } } } diff --git a/src/types.ts b/src/types.ts index 0acd827..25fe482 100644 --- a/src/types.ts +++ b/src/types.ts @@ -80,3 +80,14 @@ export interface JoinVoiceCallResponse { export type JoinVoiceCallCallback = ( payload: JoinVoiceCallParams, ) => Promise; + +export interface RemotePlayingTimeResponse { + time?: number; +} + +export interface RemoteLaggingResponse { + lagging: boolean; +} + +export type RemotePlayingTimeCallback = () => RemotePlayingTimeResponse; +export type RemoteLaggingCallback = () => RemoteLaggingResponse;