Skip to content

Commit

Permalink
修改队列处理
Browse files Browse the repository at this point in the history
  • Loading branch information
carsonxu committed Oct 31, 2017
1 parent ed85d74 commit d8f856e
Showing 1 changed file with 49 additions and 11 deletions.
60 changes: 49 additions & 11 deletions src/task.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,19 @@ var initTask = function (cos) {
var uploadingFileCount = 0;
var nextUploadIndex = 0;

var originApiMap = {};

// 把上传方法替换成添加任务的方法
util.each([
'putObject',
'sliceUploadFile',
], function (api) {
originApiMap[api] = cos[api];
cos[api] = function (params, callback) {
cos._addTask(api, params, callback);
};
});

// 接口返回简略的任务信息
var formatTask = function (task) {
var t = {
Expand All @@ -26,6 +39,11 @@ var initTask = function (cos) {
return t;
};

var emitListUpdate = function () {
cos.emit('task-list-update', {list: util.map(queue, formatTask)});
cos.emit('list-update', {list: util.map(queue, formatTask)});
};

var startNextTask = function () {
if (nextUploadIndex < queue.length &&
uploadingFileCount < cos.options.FileParallelLimit) {
Expand All @@ -34,10 +52,12 @@ var initTask = function (cos) {
uploadingFileCount++;
task.state = 'checking';
!task.params.UploadData && (task.params.UploadData = {});
cos[task.api](task.params, function (err, data) {
originApiMap[task.api].call(cos, task.params, function (err, data) {
if (!cos._isRunningTask(task.id)) return;
if (task.state === 'checking' || task.state === 'uploading') {
task.state = err ? 'error' : 'success';
uploadingFileCount--;
emitListUpdate();
startNextTask(cos);
task.callback && task.callback(err, data);
if (task.state === 'success') {
Expand All @@ -46,6 +66,7 @@ var initTask = function (cos) {
}
}
});
emitListUpdate();
}
nextUploadIndex++;
startNextTask(cos);
Expand All @@ -56,14 +77,16 @@ var initTask = function (cos) {
var task = tasks[id];
var waiting = task && task.state === 'waiting';
var running = task && (task.state === 'checking' || task.state === 'uploading');
if (waiting || running || (switchToState === 'canceled' && task.state === 'paused')) {
if (switchToState === 'canceled' && task.state !== 'canceled' ||
switchToState === 'paused' && waiting ||
switchToState === 'paused' && running) {
if (switchToState === 'paused' && task.params.Body && typeof task.params.Body.pipe === 'function') {
console.error('stream not support pause');
return;
}
task.state = switchToState;
cos.emit('inner-kill-task', {TaskId: id});
cos.emit('task-update', {task: formatTask(task)});
emitListUpdate();
if (running) {
uploadingFileCount--;
startNextTask(cos);
Expand All @@ -75,17 +98,32 @@ var initTask = function (cos) {
}
};

cos._addTask = function (id, api, params, callback) {
var size;
if (params.Body && params.Body.size) {
cos._addTasks = function (taskList) {
util.each(taskList, function (task) {
task.params.IgnoreAddEvent = true;
cos._addTask(task.api, task.params, task.callback);
});
emitListUpdate();
};

cos._addTask = function (api, params, callback) {

// 生成 id
var id = util.uuid();
params.TaskReady && params.TaskReady(id);

var size = 0;
if (params.Body && params.Body.size !== undefined) {
size = params.Body.size;
} else if (params.Body && params.Body.length) {
} else if (params.Body && params.Body.length !== undefined) {
size = params.Body.length;
} else if (params.ContentLength !== undefined) {
size = params.ContentLength;
}

if (params.ContentLength === undefined) params.ContentLength = size;
params.TaskId = id;

var task = {
// env
params: params,
Expand All @@ -110,7 +148,7 @@ var initTask = function (cos) {
if (!cos._isRunningTask(task.id)) return;
task.hashPercent = info.percent;
onHashProgress && onHashProgress(info);
cos.emit('task-update', {task: formatTask(task)});
emitListUpdate();
};
var onProgress = params.onProgress;
params.onProgress = function (info) {
Expand All @@ -120,11 +158,11 @@ var initTask = function (cos) {
task.speed = info.speed;
task.percent = info.percent;
onProgress && onProgress(info);
cos.emit('task-update', {task: formatTask(task)});
emitListUpdate();
};
queue.push(task);
tasks[id] = task;
cos.emit('task-list-update', {list: util.map(queue, formatTask)});
!params.IgnoreAddEvent && emitListUpdate();
startNextTask(cos);
return id;
};
Expand All @@ -145,7 +183,7 @@ var initTask = function (cos) {
var task = tasks[id];
if (task && (task.state === 'paused' || task.state === 'error')) {
task.state = 'waiting';
cos.emit('task-update', {task: formatTask(task)});
emitListUpdate();
nextUploadIndex = Math.min(nextUploadIndex, task.index);
startNextTask();
}
Expand Down

0 comments on commit d8f856e

Please sign in to comment.