Skip to content
This repository has been archived by the owner on Jun 2, 2024. It is now read-only.

Commit

Permalink
feat: add changes stream syncer (#970)
Browse files Browse the repository at this point in the history
* feat: add changes stream syncer

fix scope package missing problem

close #908

* refactor: use let instead of var
  • Loading branch information
fengmk2 authored and dead-horse committed Jun 25, 2016
1 parent ee1e084 commit 6485e4b
Show file tree
Hide file tree
Showing 13 changed files with 192 additions and 236 deletions.
22 changes: 7 additions & 15 deletions common/logger.js
Original file line number Diff line number Diff line change
@@ -1,20 +1,6 @@
/**!
* cnpmjs.org - common/logger.js
*
* Copyright(c) cnpmjs.org and other contributors.
* MIT Licensed
*
* Authors:
* dead_horse <dead_horse@qq.com> (http://deadhorse.me)
* fengmk2 <fengmk2@gmail.com> (http://fengmk2.github.com)
*/

'use strict';

/**
* Module dependencies.
*/

var debug = require('debug')('cnpmjs.org:logger');
var formater = require('error-formater');
var Logger = require('mini-logger');
var utility = require('utility');
Expand Down Expand Up @@ -50,6 +36,9 @@ logger.syncInfo = function () {
if (typeof args[0] === 'string') {
args[0] = util.format('[%s][%s] ', utility.logDate(), process.pid) + args[0];
}
if (debug.enabled) {
debug.apply(debug, args);
}
logger.sync_info.apply(logger, args);
};

Expand All @@ -58,5 +47,8 @@ logger.syncError =function () {
if (typeof args[0] === 'string') {
args[0] = util.format('[%s][%s] ', utility.logDate(), process.pid) + args[0];
}
if (debug.enabled) {
debug.apply(debug, args);
}
logger.sync_error.apply(logger, arguments);
};
18 changes: 5 additions & 13 deletions config/index.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,5 @@
/**
* Copyright(c) cnpmjs.org and other contributors.
* MIT Licensed
*
* Authors:
* dead_horse <dead_horse@qq.com>
* fengmk2 <m@fengmk2.com> (http://fengmk2.com)
*/

'use strict';

/**
* Module dependencies.
*/

var mkdirp = require('mkdirp');
var copy = require('copy-to');
var path = require('path');
Expand All @@ -26,6 +13,7 @@ var dataDir = path.join(process.env.HOME || root, '.cnpmjs.org');

var config = {
version: version,
dataDir: dataDir,

/**
* Cluster mode
Expand Down Expand Up @@ -209,6 +197,10 @@ var config = {
// sync devDependencies or not, default is false
syncDevDependencies: false,

// changes streaming sync
syncChangesStream: false,
handleSyncRegistry: 'http://127.0.0.1:7001',

// badge subject on http://shields.io/
badgePrefixURL: 'https://img.shields.io/badge',
badgeSubject: 'cnpm',
Expand Down
13 changes: 0 additions & 13 deletions controllers/sync_module_worker.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,5 @@
/**!
* Copyright(c) cnpmjs.org and other contributors.
* MIT Licensed
*
* Authors:
* fengmk2 <fengmk2@gmail.com> (http://fengmk2.com)
* dead_horse <dead_horse@qq.com> (http://deadhorse.me)
*/

'use strict';

/**
* Module dependencies.
*/

var debug = require('debug')('cnpmjs.org:sync_module_worker');
var co = require('co');
var gather = require('co-gather');
Expand Down
13 changes: 0 additions & 13 deletions controllers/total.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,5 @@
/**!
* Copyright(c) cnpm and other contributors.
* MIT Licensed
*
* Authors:
* fengmk2 <fengmk2@gmail.com> (http://fengmk2.com)
* dead_horse <dead_horse@qq.com> (http://deadhorse.me)
*/

'use strict';

/**
* Module dependencies.
*/

const Total = require('../services/total');
const version = require('../package.json').version;
const config = require('../config');
Expand Down
13 changes: 0 additions & 13 deletions dispatch.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,5 @@
/**!
* Copyright(c) cnpmjs.org and other contributors.
* MIT Licensed
*
* Authors:
* dead_horse <dead_horse@qq.com>
* fengmk2 <fengmk2@gmail.com> (http://fengmk2.com)
*/

'use strict';

/**
* Module dependencies.
*/

var childProcess = require('child_process');
var path = require('path');
var util = require('util');
Expand Down
3 changes: 3 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
},
"dependencies": {
"agentkeepalive": "~2.1.0",
"await-event": "^1.0.0",
"bytes": "~2.4.0",
"cfork": "~1.4.0",
"changes-stream": "^1.1.0",
"co": "~4.6.0",
"co-defer": "~1.0.0",
"co-gather": "~0.0.1",
Expand Down Expand Up @@ -44,6 +46,7 @@
"mkdirp": "~0.5.0",
"moment": "~2.12.0",
"mysql": "~2.10.2",
"mz": "^2.4.0",
"nodemailer": "~1.3.0",
"semver": "~5.1.0",
"sequelize": "~3.21.0",
Expand Down
75 changes: 75 additions & 0 deletions sync/changes_stream_syncer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
'use strict';

const ChangesStream = require('changes-stream');
const path = require('path');
const fs = require('mz/fs');
const os = require('os');
const urllib = require('urllib');
const streamAwait = require('await-event')
const logger = require('../common/logger');
const config = require('../config');

const db = 'https://replicate.npmjs.com';
const lastSeqFile = path.join(config.dataDir, '.cnpmjs.org.last_seq.txt');

module.exports = function* sync() {
const since = yield getLastSequence();
logger.syncInfo('start changes stream, since: %s', since);
const changes = new ChangesStream({
db,
since,
include_docs: false,
});
changes.await = streamAwait;
changes.on('data', change => {
logger.syncInfo('Get change: %j', change);
syncPackage(change);
});

yield changes.await('error');
};

function syncPackage(change) {
const url = `${config.handleSyncRegistry}/${change.id}/sync`;
urllib.request(url, {
method: 'PUT',
dataType: 'json',
timeout: 10000,
}, (err, data, res) => {
if (err) {
logger.syncInfo('%s:%s PUT %s error: %s, retry after 5s',
change.seq, change.id, url, err);
logger.syncError(err);
syncPackage(change);
setTimeout(() => syncPackage(change), 5000);
} else {
saveLastSequence(change.seq);
logger.syncInfo('%s:%s sync request sent, log: %s/log/%s',
change.seq, change.id, url, data.logId);
}
});
}

function* getLastSequence() {
let lastSeq;
if (yield fs.exists(lastSeqFile)) {
lastSeq = yield fs.readFile(lastSeqFile, 'utf8');
lastSeq = Number(lastSeq);
}
if (!lastSeq) {
lastSeq = 2614765;
}
// const r = yield urllib.request(db, {
// dataType: 'json',
// timeout: 15000,
// });
// logger.syncInfo('get registry info: %j', r.data);
// if (lastSeq < r.data.update_seq) {
// lastSeq = r.data.update_seq;
// }
return lastSeq;
}

function saveLastSequence(seq) {
fs.writeFile(lastSeqFile, String(seq), () => {});
}
Loading

0 comments on commit 6485e4b

Please sign in to comment.