diff --git a/common/logger.js b/common/logger.js index 8c8892635..4d53fb4ab 100644 --- a/common/logger.js +++ b/common/logger.js @@ -1,20 +1,6 @@ -/**! - * cnpmjs.org - common/logger.js - * - * Copyright(c) cnpmjs.org and other contributors. - * MIT Licensed - * - * Authors: - * dead_horse (http://deadhorse.me) - * fengmk2 (http://fengmk2.github.com) - */ - 'use strict'; -/** - * Module dependencies. - */ - +var debug = require('debug')('cnpmjs.org:logger'); var formater = require('error-formater'); var Logger = require('mini-logger'); var utility = require('utility'); @@ -50,6 +36,9 @@ logger.syncInfo = function () { if (typeof args[0] === 'string') { args[0] = util.format('[%s][%s] ', utility.logDate(), process.pid) + args[0]; } + if (debug.enabled) { + debug.apply(debug, args); + } logger.sync_info.apply(logger, args); }; @@ -58,5 +47,8 @@ logger.syncError =function () { if (typeof args[0] === 'string') { args[0] = util.format('[%s][%s] ', utility.logDate(), process.pid) + args[0]; } + if (debug.enabled) { + debug.apply(debug, args); + } logger.sync_error.apply(logger, arguments); }; diff --git a/config/index.js b/config/index.js index 899d6bf69..98de20247 100644 --- a/config/index.js +++ b/config/index.js @@ -1,18 +1,5 @@ -/** - * Copyright(c) cnpmjs.org and other contributors. - * MIT Licensed - * - * Authors: - * dead_horse - * fengmk2 (http://fengmk2.com) - */ - 'use strict'; -/** - * Module dependencies. - */ - var mkdirp = require('mkdirp'); var copy = require('copy-to'); var path = require('path'); @@ -26,6 +13,7 @@ var dataDir = path.join(process.env.HOME || root, '.cnpmjs.org'); var config = { version: version, + dataDir: dataDir, /** * Cluster mode @@ -209,6 +197,10 @@ var config = { // sync devDependencies or not, default is false syncDevDependencies: false, + // changes streaming sync + syncChangesStream: false, + handleSyncRegistry: 'http://127.0.0.1:7001', + // badge subject on http://shields.io/ badgePrefixURL: 'https://img.shields.io/badge', badgeSubject: 'cnpm', diff --git a/controllers/sync_module_worker.js b/controllers/sync_module_worker.js index 319132713..01d221342 100644 --- a/controllers/sync_module_worker.js +++ b/controllers/sync_module_worker.js @@ -1,18 +1,5 @@ -/**! - * Copyright(c) cnpmjs.org and other contributors. - * MIT Licensed - * - * Authors: - * fengmk2 (http://fengmk2.com) - * dead_horse (http://deadhorse.me) - */ - 'use strict'; -/** - * Module dependencies. - */ - var debug = require('debug')('cnpmjs.org:sync_module_worker'); var co = require('co'); var gather = require('co-gather'); diff --git a/controllers/total.js b/controllers/total.js index 4a7b6cb31..775613617 100644 --- a/controllers/total.js +++ b/controllers/total.js @@ -1,18 +1,5 @@ -/**! - * Copyright(c) cnpm and other contributors. - * MIT Licensed - * - * Authors: - * fengmk2 (http://fengmk2.com) - * dead_horse (http://deadhorse.me) - */ - 'use strict'; -/** - * Module dependencies. - */ - const Total = require('../services/total'); const version = require('../package.json').version; const config = require('../config'); diff --git a/dispatch.js b/dispatch.js index 0db1c0056..1a1a7a8e9 100644 --- a/dispatch.js +++ b/dispatch.js @@ -1,18 +1,5 @@ -/**! - * Copyright(c) cnpmjs.org and other contributors. - * MIT Licensed - * - * Authors: - * dead_horse - * fengmk2 (http://fengmk2.com) - */ - 'use strict'; -/** - * Module dependencies. - */ - var childProcess = require('child_process'); var path = require('path'); var util = require('util'); diff --git a/package.json b/package.json index b0f459fc9..210b9ffe6 100644 --- a/package.json +++ b/package.json @@ -14,8 +14,10 @@ }, "dependencies": { "agentkeepalive": "~2.1.0", + "await-event": "^1.0.0", "bytes": "~2.4.0", "cfork": "~1.4.0", + "changes-stream": "^1.1.0", "co": "~4.6.0", "co-defer": "~1.0.0", "co-gather": "~0.0.1", @@ -44,6 +46,7 @@ "mkdirp": "~0.5.0", "moment": "~2.12.0", "mysql": "~2.10.2", + "mz": "^2.4.0", "nodemailer": "~1.3.0", "semver": "~5.1.0", "sequelize": "~3.21.0", diff --git a/sync/changes_stream_syncer.js b/sync/changes_stream_syncer.js new file mode 100644 index 000000000..67a2e4362 --- /dev/null +++ b/sync/changes_stream_syncer.js @@ -0,0 +1,75 @@ +'use strict'; + +const ChangesStream = require('changes-stream'); +const path = require('path'); +const fs = require('mz/fs'); +const os = require('os'); +const urllib = require('urllib'); +const streamAwait = require('await-event') +const logger = require('../common/logger'); +const config = require('../config'); + +const db = 'https://replicate.npmjs.com'; +const lastSeqFile = path.join(config.dataDir, '.cnpmjs.org.last_seq.txt'); + +module.exports = function* sync() { + const since = yield getLastSequence(); + logger.syncInfo('start changes stream, since: %s', since); + const changes = new ChangesStream({ + db, + since, + include_docs: false, + }); + changes.await = streamAwait; + changes.on('data', change => { + logger.syncInfo('Get change: %j', change); + syncPackage(change); + }); + + yield changes.await('error'); +}; + +function syncPackage(change) { + const url = `${config.handleSyncRegistry}/${change.id}/sync`; + urllib.request(url, { + method: 'PUT', + dataType: 'json', + timeout: 10000, + }, (err, data, res) => { + if (err) { + logger.syncInfo('%s:%s PUT %s error: %s, retry after 5s', + change.seq, change.id, url, err); + logger.syncError(err); + syncPackage(change); + setTimeout(() => syncPackage(change), 5000); + } else { + saveLastSequence(change.seq); + logger.syncInfo('%s:%s sync request sent, log: %s/log/%s', + change.seq, change.id, url, data.logId); + } + }); +} + +function* getLastSequence() { + let lastSeq; + if (yield fs.exists(lastSeqFile)) { + lastSeq = yield fs.readFile(lastSeqFile, 'utf8'); + lastSeq = Number(lastSeq); + } + if (!lastSeq) { + lastSeq = 2614765; + } + // const r = yield urllib.request(db, { + // dataType: 'json', + // timeout: 15000, + // }); + // logger.syncInfo('get registry info: %j', r.data); + // if (lastSeq < r.data.update_seq) { + // lastSeq = r.data.update_seq; + // } + return lastSeq; +} + +function saveLastSequence(seq) { + fs.writeFile(lastSeqFile, String(seq), () => {}); +} diff --git a/sync/index.js b/sync/index.js index 04474a12a..3557d4e69 100644 --- a/sync/index.js +++ b/sync/index.js @@ -1,36 +1,23 @@ -/** - * Copyright(c) cnpm and other contributors. - * MIT Licensed - * - * Authors: - * dead_horse (http://deadhorse.me) - * fengmk2 (http://fengmk2.com) - */ - 'use strict'; -/** - * Module dependencies. - */ - -var debug = require('debug')('cnpmjs.org:sync:index'); -var co = require('co'); -var ms = require('humanize-ms'); -var util = require('util'); -var config = require('../config'); -var mail = require('../common/mail'); -var logger = require('../common/logger'); -var totalService = require('../services/total'); +const debug = require('debug')('cnpmjs.org:sync:index'); +const co = require('co'); +const ms = require('humanize-ms'); +const util = require('util'); +const config = require('../config'); +const mail = require('../common/mail'); +const logger = require('../common/logger'); +const totalService = require('../services/total'); -var sync = null; +let sync = null; switch (config.syncModel) { -case 'all': - sync = require('./sync_all'); - break; -case 'exist': - sync = require('./sync_exist'); - break; + case 'all': + sync = require('./sync_all'); + break; + case 'exist': + sync = require('./sync_exist'); + break; } if (!sync && config.enableCluster) { @@ -38,34 +25,72 @@ if (!sync && config.enableCluster) { process.exit(0); } -console.log('[%s] [sync_worker:%s] syncing with %s mode', - Date(), process.pid, config.syncModel); +console.log('[%s] [sync_worker:%s] syncing with %s mode, changesStreamingSync: %s', + Date(), process.pid, config.syncModel, config.changesStreamingSync); function onerror(err) { logger.error(err); } -//set sync_status = 0 at first +// set sync_status = 0 at first co(function* () { yield totalService.updateSyncStatus(0); yield checkSyncStatus(); }).catch(onerror); -var syncInterval = ms(config.syncInterval); -var minSyncInterval = ms('5m'); +let syncInterval = ms(config.syncInterval); +const minSyncInterval = ms('5m'); if (!syncInterval || syncInterval < minSyncInterval) { syncInterval = minSyncInterval; } -// the same time only sync once -var syncing = false; -var syncFn = co.wrap(function* () { - debug('mode: %s, syncing: %s', config.syncModel, syncing); - if (!syncing) { +if (sync) { + // the same time only sync once + let syncing = false; + const syncFn = co.wrap(function*() { + debug('mode: %s, syncing: %s', config.syncModel, syncing); + if (!syncing) { + syncing = true; + debug('start syncing'); + let data; + let error; + try { + data = yield sync(); + } catch (err) { + error = err; + error.message += ' (sync package error)'; + logger.syncError(error); + } + data && logger.syncInfo(data); + if (!config.debug) { + sendMailToAdmin(error, data, new Date()); + } + syncing = false; + } + + // check last_sync_time and last_exist_sync_time + yield checkSyncStatus(); + }); + + syncFn().catch(onerror); + setInterval(() => syncFn().catch(onerror), syncInterval); +} + +/** + * sync popular modules + */ + +if (config.syncPopular) { + const sync = require('./sync_popular'); + let syncing = false; + const syncFn = co.wrap(function*() { + if (syncing) { + return; + } syncing = true; - debug('start syncing'); - var data; - var error; + logger.syncInfo('Start syncing popular modules...'); + let data; + let error; try { data = yield sync(); } catch (err) { @@ -73,76 +98,53 @@ var syncFn = co.wrap(function* () { error.message += ' (sync package error)'; logger.syncError(error); } - data && logger.syncInfo(data); + + if (data) { + logger.syncInfo(data); + } if (!config.debug) { sendMailToAdmin(error, data, new Date()); } syncing = false; - } - - // check last_sync_time and last_exist_sync_time - yield checkSyncStatus(); -}); + }); -if (sync) { syncFn().catch(onerror); - setInterval(function () { - syncFn().catch(onerror); - }, syncInterval); + setInterval(() => syncFn().catch(onerror), ms(config.syncPopularInterval)); } -/** - * sync popular modules - */ - -var startSyncPopular = require('./sync_popular'); -var syncingPopular = false; -var syncPopularFn = co.wrap(function* syncPopular() { - if (syncingPopular) { - return; - } - syncingPopular = true; - logger.syncInfo('Start syncing popular modules...'); - var data; - var error; - try { - data = yield startSyncPopular(); - } catch (err) { - error = err; - error.message += ' (sync package error)'; - logger.syncError(error); - } - - if (data) { - logger.syncInfo(data); - } - if (!config.debug) { - sendMailToAdmin(error, data, new Date()); - } - - syncingPopular = false; -}); +if (config.syncChangesStream) { + const sync = require('./changes_stream_syncer'); + let syncing = false; + const syncFn = co.wrap(function*() { + if (syncing) { + return; + } + syncing = true; + logger.syncInfo('Start changes stream syncing...'); + try { + yield sync(); + } catch (err) { + err.message += ' (sync changes stream error)'; + logger.syncError(err); + } + syncing = false; + }); -if (config.syncPopular) { - syncPopularFn().catch(onerror); - setInterval(function () { - syncPopularFn().catch(onerror); - }, ms(config.syncPopularInterval)); -} else { - logger.syncInfo('sync popular module disable'); + syncFn().catch(onerror); + setInterval(() => syncFn().catch(onerror), ms('1m')); } function sendMailToAdmin(err, result, syncTime) { result = result || {}; - var to = []; + const to = []; for (var name in config.admins) { to.push(config.admins[name]); } debug('Send email to all admins: %j, with err message: %s, result: %j, start sync time: %s.', to, err ? err.message : '', result, syncTime); - var subject; - var type; - var html; + let subject; + let type; + let html; if (err) { // ignore 503 error if (err.status === 503) { @@ -169,7 +171,7 @@ function sendMailToAdmin(err, result, syncTime) { debug('send email with type: %s, subject: %s, html: %s', type, subject, html); logger.syncInfo('send email with type: %s, subject: %s, html: %s', type, subject, html); if (type && type !== 'log') { - mail[type](to, subject, html, function (err) { + mail[type](to, subject, html, err => { if (err) { logger.error(err); } @@ -182,8 +184,8 @@ function* checkSyncStatus() { if (!config.sourceNpmRegistryIsCNpm) { return; } - var total = yield totalService.getTotalInfo(); - var lastSyncTime; + const total = yield totalService.getTotalInfo(); + let lastSyncTime; if (config.syncModel === 'all') { lastSyncTime = total.last_sync_time; } else if (config.syncModel === 'exist') { @@ -193,11 +195,11 @@ function* checkSyncStatus() { if (!lastSyncTime) { return; } - var diff = Date.now() - lastSyncTime; - var oneDay = 3600000 * 24; - var maxTime = Math.max(oneDay, syncInterval * 2); + const diff = Date.now() - lastSyncTime; + const oneDay = 3600000 * 24; + const maxTime = Math.max(oneDay, syncInterval * 2); if (diff > maxTime) { - var err = new Error('Last sync time is expired in ' + diff + ' ms, lastSyncTime: ' + + const err = new Error('Last sync time is expired in ' + diff + ' ms, lastSyncTime: ' + new Date(lastSyncTime) + ', maxTime: ' + maxTime + ' ms'); err.name = 'SyncExpiredError'; sendMailToAdmin(err, null, new Date()); diff --git a/sync/status.js b/sync/status.js index 75fccf9e7..5faa3eb88 100644 --- a/sync/status.js +++ b/sync/status.js @@ -1,19 +1,5 @@ -/*! - * cnpmjs.org - sync/status.js - * - * Copyright(c) cnpmjs.org and other contributors. - * MIT Licensed - * - * Authors: - * dead_horse (http://deadhorse.me) - */ - 'use strict'; -/** - * Module dependencies. - */ - var debug = require('debug')('cnpmjs.org:sync:status'); var co = require('co'); var Total = require('../services/total'); diff --git a/sync/sync_all.js b/sync/sync_all.js index a4d369db7..b00f30edc 100644 --- a/sync/sync_all.js +++ b/sync/sync_all.js @@ -1,20 +1,5 @@ -/**! - * cnpmjs.org - sync/sync_all.js - * - * Copyright(c) cnpmjs.org and other contributors. - * MIT Licensed - * - * Authors: - * dead_horse (http://deadhorse.me) - * fengmk2 (http://fengmk2.com) - */ - 'use strict'; -/** - * Module dependencies. - */ - var thunkify = require('thunkify-wrap'); var config = require('../config'); var Status = require('./status'); diff --git a/sync/sync_exist.js b/sync/sync_exist.js index d521572a0..2575bb556 100644 --- a/sync/sync_exist.js +++ b/sync/sync_exist.js @@ -1,17 +1,5 @@ -/*! - * Copyright(c) cnpmjs.org and other contributors. - * MIT Licensed - * - * Authors: - * dead_horse (http://deadhorse.me) - */ - 'use strict'; -/** - * Module dependencies. - */ - var debug = require('debug')('cnpmjs.org:sync:sync_exist'); var Status = require('./status'); var thunkify = require('thunkify-wrap'); diff --git a/sync/sync_popular.js b/sync/sync_popular.js index c0e12a1e2..19fbc0248 100644 --- a/sync/sync_popular.js +++ b/sync/sync_popular.js @@ -1,19 +1,5 @@ -/*! - * cnpmjs.org - sync/sync_popular.js - * - * Copyright(c) cnpmjs.org and other contributors. - * MIT Licensed - * - * Authors: - * dead_horse (http://deadhorse.me) - */ - 'use strict'; -/** - * Module dependencies. - */ - var debug = require('debug')('cnpmjs.org:sync:sync_popular'); var thunkify = require('thunkify-wrap'); var config = require('../config'); diff --git a/sync/sync_since.js b/sync/sync_since.js index 2f4730026..62deeebb3 100644 --- a/sync/sync_since.js +++ b/sync/sync_since.js @@ -1,19 +1,5 @@ -/**! - * sync packages since by some days ago - * - * Copyright(c) cnpmjs.org and other contributors. - * MIT Licensed - * - * Authors: - * fengmk2 (http://fengmk2.com) - */ - 'use strict'; -/** - * Module dependencies. - */ - const thunkify = require('thunkify-wrap'); const co = require('co'); const ms = require('humanize-ms');