Skip to content
This repository has been archived by the owner on Jun 2, 2024. It is now read-only.

fix: should show new version package count #984

Merged
merged 1 commit into from
Jul 3, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 27 additions & 17 deletions controllers/sync_module_worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ function SyncModuleWorker(options) {

this.successes = [];
this.fails = [];
this.updates = [];
}

util.inherits(SyncModuleWorker, EventEmitter);
Expand Down Expand Up @@ -127,7 +128,7 @@ SyncModuleWorker.prototype.start = function () {
// sync upstream
if (that.syncUpstreamFirst) {
try {
yield* that.syncUpstream(that.startName);
yield that.syncUpstream(that.startName);
} catch (err) {
logger.error(err);
}
Expand Down Expand Up @@ -180,7 +181,7 @@ SyncModuleWorker.prototype._doneOne = function* (concurrencyId, name, success) {
var that = this;
// relase the stack: https://github.com/cnpm/cnpmjs.org/issues/328
defer.setImmediate(function* () {
yield* that.next(concurrencyId);
yield that.next(concurrencyId);
});
};

Expand Down Expand Up @@ -300,13 +301,13 @@ SyncModuleWorker.prototype.next = function* (concurrencyId) {
if (common.isPrivateScopedPackage(name)) {
this.log('[c#%d] [%s] ignore sync private scoped %j package',
concurrencyId, name, config.scopes);
yield* this._doneOne(concurrencyId, name, true);
yield this._doneOne(concurrencyId, name, true);
return;
}

// get from npm
try {
var result = yield* npmSerivce.request('/' + name.replace('/', '%2f'));
var result = yield npmSerivce.request('/' + name.replace('/', '%2f'));
pkg = result.data;
status = result.status;
} catch (err) {
Expand All @@ -333,39 +334,44 @@ SyncModuleWorker.prototype.next = function* (concurrencyId) {
if (!pkg) {
that.log('[c#%s] [error] [%s] get package error: package not exists, status: %s',
concurrencyId, name, status);
yield* that._doneOne(concurrencyId, name, true);
yield that._doneOne(concurrencyId, name, true);
return;
}

that.log('[c#%d] [%s] pkg status: %d, start...', concurrencyId, name, status);

if (unpublishedInfo) {
try {
yield* that._unpublished(name, unpublishedInfo);
yield that._unpublished(name, unpublishedInfo);
} catch (err) {
that.log('[c#%s] [error] [%s] sync error: %s', concurrencyId, name, err.stack);
yield* that._doneOne(concurrencyId, name, false);
yield that._doneOne(concurrencyId, name, false);
return;
}
return yield* that._doneOne(concurrencyId, name, true);
return yield that._doneOne(concurrencyId, name, true);
}

var versions;
try {
versions = yield* that._sync(name, pkg);
versions = yield that._sync(name, pkg);
} catch (err) {
that.log('[c#%s] [error] [%s] sync error: %s', concurrencyId, name, err.stack);
yield* that._doneOne(concurrencyId, name, false);
yield that._doneOne(concurrencyId, name, false);
return;
}

// has new version
if (versions.length > 0) {
that.updates.push(name);
}

this.log('[c#%d] [%s] synced success, %d versions: %s',
concurrencyId, name, versions.length, versions.join(', '));
yield* this._doneOne(concurrencyId, name, true);
yield this._doneOne(concurrencyId, name, true);
};

function* _listStarUsers(modName) {
var users = yield* packageService.listStarUserNames(modName);
var users = yield packageService.listStarUserNames(modName);
var userMap = {};
users.forEach(function (user) {
userMap[user] = true;
Expand Down Expand Up @@ -663,7 +669,7 @@ SyncModuleWorker.prototype._sync = function* (name, pkg) {
continue;
}
try {
yield* that._syncOneVersion(index, syncModule);
yield that._syncOneVersion(index, syncModule);
syncedVersionNames.push(syncModule.version);
} catch (err) {
that.log(' [%s:%d] sync error, version: %s, %s: %s',
Expand Down Expand Up @@ -923,7 +929,7 @@ SyncModuleWorker.prototype._syncOneVersion = function *(versionIndex, sourcePack
}

// add module dependence
yield* packageService.addDependencies(sourcePackage.name, dependencies);
yield packageService.addDependencies(sourcePackage.name, dependencies);

var shasum = crypto.createHash('sha1');
var dataSize = 0;
Expand All @@ -945,6 +951,7 @@ SyncModuleWorker.prototype._syncOneVersion = function *(versionIndex, sourcePack
var err = new Error('Download ' + downurl + ' fail, status: ' + statusCode);
err.name = 'DownloadTarballError';
err.data = sourcePackage;
logger.syncInfo('[sync_module_worker] %s', err.message);
throw err;
}

Expand All @@ -961,6 +968,7 @@ SyncModuleWorker.prototype._syncOneVersion = function *(versionIndex, sourcePack
var err = new Error('Download ' + downurl + ' file size is zero');
err.name = 'DownloadTarballSizeZeroError';
err.data = sourcePackage;
logger.syncInfo('[sync_module_worker] %s', err.message);
throw err;
}

Expand All @@ -971,6 +979,7 @@ SyncModuleWorker.prototype._syncOneVersion = function *(versionIndex, sourcePack
' not match ' + sourcePackage.dist.shasum);
err.name = 'DownloadTarballShasumError';
err.data = sourcePackage;
logger.syncInfo('[sync_module_worker] %s', err.message);
throw err;
}

Expand All @@ -989,8 +998,9 @@ SyncModuleWorker.prototype._syncOneVersion = function *(versionIndex, sourcePack
throw err;
}
logger.syncInfo('[sync_module_worker] uploaded, saving %j to database', result);
var r = yield *afterUpload(result);
logger.syncInfo('[sync_module_worker] sync %s@%s done!', sourcePackage.name, sourcePackage.version);
var r = yield afterUpload(result);
logger.syncInfo('[sync_module_worker] sync %s@%s done!',
sourcePackage.name, sourcePackage.version);
return r;
} finally {
// remove tmp file whatever
Expand Down Expand Up @@ -1034,7 +1044,7 @@ SyncModuleWorker.prototype._syncOneVersion = function *(versionIndex, sourcePack
}

mod.package.dist = dist;
var r = yield* packageService.saveModule(mod);
var r = yield packageService.saveModule(mod);

that.log(' [%s:%s] done, insertId: %s, author: %s, version: %s, '
+ 'size: %d, publish_time: %j, publish on cnpm: %s',
Expand Down
6 changes: 4 additions & 2 deletions sync/changes_stream_syncer.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ let _STREAM_ID = 0;
module.exports = function* sync() {
const since = yield getLastSequence();
const streamId = _STREAM_ID++;
let changesCount = 0;
logger.syncInfo('start changes stream#%d, since: %s', streamId, since);
const changes = new ChangesStream({
db,
Expand All @@ -23,7 +24,8 @@ module.exports = function* sync() {
});
changes.await = streamAwait;
changes.on('data', change => {
logger.syncInfo('stream#%d get change: %j', streamId, change);
changesCount++;
logger.syncInfo('stream#%d get change#%d: %j', streamId, changesCount, change);
syncPackage(change);
});

Expand All @@ -32,7 +34,7 @@ module.exports = function* sync() {
} catch (err) {
// make sure changes steam is destroy
changes.destroy();
err.message += `, stream#${streamId}`;
err.message += `, stream#${streamId}, changesCount#${changesCount}`;
throw err;
}
};
Expand Down
2 changes: 1 addition & 1 deletion sync/status.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Status.prototype.log = function (syncDone) {
lastSyncModule: this.lastSyncModule,
};
co(function* () {
yield* Total.updateSyncNum(params);
yield Total.updateSyncNum(params);
}).catch(function () {});
};

Expand Down
13 changes: 8 additions & 5 deletions sync/sync_all.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,22 @@ module.exports = function* sync() {
concurrency: config.syncConcurrency,
syncUpstreamFirst: false,
});
Status.init({need: packages.length}, worker);
Status.init({
need: packages.length,
}, worker);
worker.start();
var end = thunkify.event(worker);
yield end();

logger.syncInfo('All packages sync done, successes %d, fails %d',
worker.successes.length, worker.fails.length);
logger.syncInfo('All packages sync done, successes %d, fails %d, updates %d',
worker.successes.length, worker.fails.length, worker.updates.length);
//only when all succss, set last sync time
if (!worker.fails.length) {
yield* totalService.setLastSyncTime(syncTime);
yield totalService.setLastSyncTime(syncTime);
}
return {
successes: worker.successes,
fails: worker.fails
fails: worker.fails,
updates: worker.updates,
};
};