Skip to content

Commit

Permalink
Merge pull request #77 from forscht/hotfix/downloadfile
Browse files Browse the repository at this point in the history
hotfix: fix download files
  • Loading branch information
forscht authored Mar 30, 2023
2 parents 1edb7d0 + 7d17a28 commit b87e37b
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 43 deletions.
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@forscht/ddrive",
"version": "4.1.0",
"version": "4.2.0",
"description": "A lightweight cloud storage system using discord as storage device written in nodejs",
"main": "src/index.js",
"author": "Darshan Patel <darshanhihoriya@gmail.com> (https://github.com/forscht/ddrive)",
Expand Down
3 changes: 2 additions & 1 deletion src/DFs/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const crypto = require('crypto')
const { REST } = require('@discordjs/rest')
const _ = require('lodash')
const uuid = require('uuid').v4
const AsyncStreamProcessorWithConcurrency = require('./lib/AsyncStreamProcessorWithConcurrency')
const AsyncStreamProcessor = require('./lib/AsyncStreamProcessor')
const StreamChunker = require('./lib/StreamChunker')

Expand Down Expand Up @@ -160,7 +161,7 @@ class DiscordFileSystem {
stream
.on('aborted', () => reject(new Error('file upload aborted'))) // On HTTP request abort delete all the messages and reject promise
.pipe(new StreamChunker(this.chunkSize))
.pipe(new AsyncStreamProcessor(processChunk, this.maxUploadConc))
.pipe(new AsyncStreamProcessorWithConcurrency(processChunk, this.maxUploadConc))
.on('finish', () => resolve(parts))
.on('error', (err) => reject(err))
})
Expand Down
45 changes: 6 additions & 39 deletions src/DFs/lib/AsyncStreamProcessor.js
Original file line number Diff line number Diff line change
@@ -1,49 +1,16 @@
const { Transform } = require('stream')

function cbNoop(cb) {
cb()
}

class AsyncStreamProcessor extends Transform {
constructor(chunkProcessor, maxConcurrency = 1) {
class StreamChunker extends Transform {
constructor(chunkProcessor) {
super()
this.chunkProcessor = chunkProcessor
this.maxConcurrency = maxConcurrency
this.chunkCount = 0

this.lastCallback = undefined
this.pendingFinish = undefined
this.concurrent = 0
this._final = this.callOnFinish(cbNoop)
}

callOnFinish(original) {
return function cbHell(callback) {
if (this.concurrent === 0) original.call(this, callback)
else this.pendingFinish = original.bind(this, callback)
}
}

_transform(chunk, encoding, callback) {
this.concurrent += 1
if (this.concurrent < this.maxConcurrency) {
callback(null)
} else this.lastCallback = callback
this.chunkProcessor(chunk, this.chunkCount)
.then(() => {
this.concurrent -= 1
if (this.lastCallback) {
this.lastCallback()
this.lastCallback = null
}
if (this.concurrent === 0 && this.pendingFinish) {
this.pendingFinish()
this.pendingFinish = null
}
})
.catch((err) => this.emit('error', err))
this.chunkCount += 1
this.chunkProcessor(chunk)
.then(() => callback(null))
.catch((err) => callback(err))
}
}

module.exports = AsyncStreamProcessor
module.exports = StreamChunker
49 changes: 49 additions & 0 deletions src/DFs/lib/AsyncStreamProcessorWithConcurrency.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
const { Transform } = require('stream')

function cbNoop(cb) {
cb()
}

class AsyncStreamProcessorWithConcurrency extends Transform {
constructor(chunkProcessor, maxConcurrency = 1) {
super()
this.chunkProcessor = chunkProcessor
this.maxConcurrency = maxConcurrency
this.chunkCount = 0

this.lastCallback = undefined
this.pendingFinish = undefined
this.concurrent = 0
this._final = this.callOnFinish(cbNoop)
}

callOnFinish(original) {
return function cbHell(callback) {
if (this.concurrent === 0) original.call(this, callback)
else this.pendingFinish = original.bind(this, callback)
}
}

_transform(chunk, encoding, callback) {
this.concurrent += 1
if (this.concurrent < this.maxConcurrency) {
callback(null)
} else this.lastCallback = callback
this.chunkProcessor(chunk, this.chunkCount)
.then(() => {
this.concurrent -= 1
if (this.lastCallback) {
this.lastCallback()
this.lastCallback = null
}
if (this.concurrent === 0 && this.pendingFinish) {
this.pendingFinish()
this.pendingFinish = null
}
})
.catch((err) => this.emit('error', err))
this.chunkCount += 1
}
}

module.exports = AsyncStreamProcessorWithConcurrency

0 comments on commit b87e37b

Please sign in to comment.