Skip to content

Commit

Permalink
feat: support more loadbalance algorithm (#7)
Browse files Browse the repository at this point in the history
  • Loading branch information
gxcsoccer authored Jul 30, 2018
1 parent 4fa47a9 commit 4608ddd
Show file tree
Hide file tree
Showing 18 changed files with 497 additions and 56 deletions.
80 changes: 38 additions & 42 deletions lib/client/address_group.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ const debug = require('debug')('rpc-client#AddressGroup');
const assert = require('assert');
const Base = require('sdk-base');
const utils = require('./utils');
const utility = require('utility');
const Table = require('easy-table');
const sleep = require('mz-modules/sleep');
const DynamicConfig = require('./dynamic_config');
const HealthCounter = require('./metric/health_counter');
const createLoadBalancer = require('./loadbalancer');

const defaultOptions = {
loadbalancerClass: 'roundRobin',
retryFaultInterval: 30000, // 30s 重新尝试连接下不可用的地址
};
const NA = 'N/A';
Expand All @@ -30,7 +31,6 @@ class AddressGroup extends Base {
this._allAddressList = null; // 原始地址列表
this._totalSize = 0; // 完整的地址数量
this._unChoosedAddressList = null; // 未被选中的地址
this._offset = 0; // 用于 rr 轮询地址的偏移值
this._faultAddressMap = new Map(); // 故障地址列表
this._weightMap = new Map(); // <host, weight>
this._maxIsolatedNum = 0; // 最大的故障摘除数量,如果故障机器超过这个值则禁用摘除功能
Expand All @@ -39,11 +39,13 @@ class AddressGroup extends Base {
this._connectionPoolSize = this.connectionPoolConfig.initConnectionSize;
const config = DynamicConfig.instance.metric;
this._maxIdleWindow = config.numBuckets * config.bucketSizeInMs;
this._loadbalancer = createLoadBalancer(this);

// 每个 window 周期更新一遍权重,权重区间 [0, 10],0 代表地址被摘除了
this.ready(err => {
if (!err && !this._closed) {
this._healthCounter = HealthCounter.getInstance(this.key);
// HealthCounter.getInstance(key, prepend) prepend => false,确保 avgCounter 在最后触发
this._healthCounter = HealthCounter.getInstance(this.key, false);
this._healthCounter.on('next', hc => {
try {
this._onNext(hc);
Expand Down Expand Up @@ -100,14 +102,18 @@ class AddressGroup extends Base {
return this.options.connectionManager;
}

get loadbalancerClass() {
return this.options.loadbalancerClass;
}

get addressList() {
return this._addressList;
}

set addressList(val) {
this._addressList = this._chooseAddresses(val);
this._choosedSize = this._addressList.length;
this._offset = utility.random(this._choosedSize);
this._loadbalancer.reset();
// 最大熔断个数,为了防止「雪崩」,超过这个数,则关闭熔断
this._maxIsolatedNum = Math.ceil(this.faultToleranceConfig.maxIsolatedPercentage * this._choosedSize);

Expand Down Expand Up @@ -200,9 +206,6 @@ class AddressGroup extends Base {
const avgRTStr = avgRT + 'ms';
const totalRequestCount = hc.totalCount;

this.logger.debug('[AddressGroup] group: %s, total request count: %d, avg rt: %s, avg error rate: %s, address count: %d',
this.key, totalRequestCount, avgRTStr, avgErrorRateStr, this._choosedSize);

if (this.connectionPoolConfig.elasticControl) {
// 根据上个窗口的总请求数来计算需要多少个 connection
const capacityPerConnection = this.connectionPoolConfig.capacityPerConnection;
Expand Down Expand Up @@ -278,25 +281,28 @@ class AddressGroup extends Base {
// 心跳,确保不会被断开
conn.heartbeat();
}
table.cell('NO.', ++i);
table.cell('Address', address.host);
table.cell('Cur Weight', weight);
table.cell('Pre Weight', preWeight);
table.cell('Total Count', totalCount);
table.cell('Error Count', errorCount);
table.cell('Error Rate', errorRateStr + ' / ' + avgErrorRateStr + ' = ' + errorMultiple);
table.cell('RT', rtStr + ' / ' + avgRTStr + ' = ' + rtMultiple);
table.newRow();
if (preWeight !== weight) {
table.cell('NO.', ++i);
table.cell('Address', address.host);
table.cell('Cur Weight', weight);
table.cell('Pre Weight', preWeight);
table.cell('Total Count', totalCount);
table.cell('Error Count', errorCount);
table.cell('Error Rate', errorRateStr + ' / ' + avgErrorRateStr + ' = ' + errorMultiple);
table.cell('RT', rtStr + ' / ' + avgRTStr + ' = ' + rtMultiple);
table.newRow();
changed = true;
}
// 调控计数(比初始权重小则认为被调控)
if (weight < DEFAULT_WEIGHT) {
degradeCount++;
}
changed = changed || preWeight !== weight;
this._weightMap.set(address.host, weight);
}
// 避免太多日志
if (changed || debug.enabled) {
this.logger.info(table.toString());
this.logger.info('[AddressGroup] group: %s weight %s, total request count: %d, avg rt: %s, avg error rate: %s, address count: %d\n%s',
this.key, changed ? 'changed' : 'unchanged', totalRequestCount, avgRTStr, avgErrorRateStr, this._choosedSize, table.toString());
}
// 如果一次降级的太多,可能造成流量全部打到部分机器,从而雪崩,所以超过某个阀值后禁用调控
if (degradeCount <= this._maxIsolatedNum) {
Expand Down Expand Up @@ -342,43 +348,33 @@ class AddressGroup extends Base {
}
}

// 带权重的 Round Robin 算法
rr() {
const address = this.addressList[this._offset];
this._offset = (this._offset + 1) % this._choosedSize;
getWeight(address) {
const conn = this.connectionManager.get(address);
// connection 为 null 说明地址根本连不上,直接跳过
if (!conn) {
this._faultAddressMap.set(address.host, address);
return null;
return 0;
}
if (!this.degradeEnable) return conn;
// 不支持降级的话,直接返回默认权重
if (!this.degradeEnable) return DEFAULT_WEIGHT;

const weight = this._weightMap.get(address.host) || DEFAULT_WEIGHT;
if (weight === DEFAULT_WEIGHT) return conn;
if (weight === 0) return null;

const randNum = utility.random(DEFAULT_WEIGHT);
const isIdleTooLong = Date.now() - conn.lastInvokeTime > this._maxIdleWindow;
debug('rr with weight algorithm, randNum: %s, weight: %s, isIdleTooLong: %s', randNum, weight, isIdleTooLong);
return weight >= randNum || isIdleTooLong ? conn : null;
let weight = this._weightMap.get(address.host) || DEFAULT_WEIGHT;
// 长时间没有被路由到的话,需要给一次机会做 single check
if (weight < DEFAULT_WEIGHT && Date.now() - conn.lastInvokeTime > this._maxIdleWindow) {
weight = DEFAULT_WEIGHT;
}
return weight;
}

async getConnection(req) {
const meta = req.meta;
meta.connectionGroup = this.key;

if (!this._choosedSize) return null;
const address = this._loadbalancer.select(req);
if (!address) return null;

let conn;
let count = this._choosedSize;
while (count--) {
conn = this.rr();
if (conn) return conn;
}
// 如果所有地址都轮询一遍以后没找到,再兜底一下
const address = this.addressList[this._offset];
return await this.connectionManager.createAndGet(address, this.options.connectionOpts, this.options.connectionClass);
const { connectionOpts, connectionClass } = this.options;
return await this.connectionManager.createAndGet(address, connectionOpts, connectionClass);
}

close() {
Expand Down
1 change: 1 addition & 0 deletions lib/client/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const defaultOptions = {
group: 'SOFA',
version: '1.0',
responseTimeout: 3000,
loadbalancerClass: 'roundRobin',
};

class RpcConsumer extends Base {
Expand Down
40 changes: 40 additions & 0 deletions lib/client/loadbalancer/base.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
'use strict';

const Base = require('sdk-base');

class LoadBalancer extends Base {
constructor(addressGroup) {
super();
this.addressGroup = addressGroup;
this.reset();
this.ready(true);
}

get size() {
return this.addressGroup._choosedSize;
}

get addressList() {
return this.addressGroup.addressList;
}

select(request) {
if (this.size === 0) return null;
if (this.size === 1) return this.addressList[0];

return this._doSelect(request, this.addressList);
}

reset() {}

/* istanbul ignore next */
_doSelect() {
throw new Error('not implement');
}

getWeight(address) {
return this.addressGroup.getWeight(address);
}
}

module.exports = LoadBalancer;
66 changes: 66 additions & 0 deletions lib/client/loadbalancer/consistent_hash.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
'use strict';

const utils = require('../utils');
const LoadBalancer = require('./base');

const NUM = 128;

class ConsistentHashLoadBalancer extends LoadBalancer {
reset() {
this._virtualNodes = new Map();
if (this.addressList) {
for (const address of this.addressList) {
for (let i = 0; i < NUM / 4; i++) {
const digest = this._messageDigest(`${address.host}${i}`);
for (let h = 0; h < 4; h++) {
const m = this._hash(digest, h);
this._virtualNodes.set(m, address);
}
}
}
}
this._sortKeys = Array.from(this._virtualNodes.keys()).sort();
}

/* eslint-disable no-bitwise */
_hash(digest, index) {
const f = ((digest[3 + index * 4] & 0xFF) << 24) |
((digest[2 + index * 4] & 0xFF) << 16) |
((digest[1 + index * 4] & 0xFF) << 8) |
(digest[index * 4] & 0xFF);
return f & 0xFFFFFFFF;
}
/* eslint-enable no-bitwise */

_messageDigest(value) {
return utils.md5(value);
}

_selectForKey(hash) {
const len = this._sortKeys.length;
let key = this._sortKeys[0];
if (this._sortKeys[len - 1] >= hash) {
for (let i = len - 1; i >= 0; i--) {
if (this._sortKeys[i] < hash) {
key = this._sortKeys[i + 1];
break;
}
}
}
return this._virtualNodes.get(key);
}

_buildKeyOfHash(request) {
const args = request.args;
if (!args.length) return '';
return JSON.stringify(args[0]);
}

_doSelect(request) {
const key = this._buildKeyOfHash(request);
const digest = this._messageDigest(key);
return this._selectForKey(this._hash(digest, 0));
}
}

module.exports = ConsistentHashLoadBalancer;
24 changes: 24 additions & 0 deletions lib/client/loadbalancer/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
'use strict';

const assert = require('assert');

const buildinLoadBalancers = {
get random() {
return require('./random');
},
get roundRobin() {
return require('./weight_rr');
},
get consistentHash() {
return require('./consistent_hash');
},
};

module.exports = addressGroup => {
let loadbalancerClass = addressGroup.loadbalancerClass;
if (typeof loadbalancerClass === 'string') {
loadbalancerClass = buildinLoadBalancers[loadbalancerClass];
}
assert(typeof loadbalancerClass === 'function', `loadbalancerClass:${addressGroup.loadbalancerClass} invalid`);
return new loadbalancerClass(addressGroup);
};
39 changes: 39 additions & 0 deletions lib/client/loadbalancer/random.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
'use strict';

const utility = require('utility');
const LoadBalancer = require('./base');

// 负载均衡随机算法:全部列表按权重随机选择
class RandomLoadBalancer extends LoadBalancer {
_doSelect(request, addressList) {
const len = addressList.length;
let totalWeight = 0;
let isWeightSame = true;
let address;
for (let i = 0; i < len; i++) {
const weigit = this.getWeight(addressList[i]);
totalWeight += weigit;
if (isWeightSame && i > 0 && weigit !== this.getWeight(addressList[i - 1])) {
isWeightSame = false;
}
}
if (totalWeight > 0 && !isWeightSame) {
// 如果权重不相同且权重大于0则按总权重数随机
let offset = utility.random(totalWeight);
for (let i = 0; i < len; i++) {
// 并确定随机值落在哪个片断上
offset -= this.getWeight(addressList[i]);
if (offset < 0) {
address = addressList[i];
break;
}
}
} else {
const index = utility.random(len); // math.randomInt(len);
address = addressList[index];
}
return address;
}
}

module.exports = RandomLoadBalancer;
38 changes: 38 additions & 0 deletions lib/client/loadbalancer/weight_rr.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
'use strict';

const utility = require('utility');
const LoadBalancer = require('./base');

const DEFAULT_WEIGHT = 100;

// 带权重的 Round Robin 算法
class WeightRoundRobinLoadBalancer extends LoadBalancer {
reset() {
this._offset = utility.random(this.size);
}

_rr(request, addressList) {
const address = addressList[this._offset];
this._offset = (this._offset + 1) % this.size;

const weight = this.getWeight(address);
if (weight === DEFAULT_WEIGHT) return address;
if (weight === 0) return null;

const randNum = utility.random(DEFAULT_WEIGHT);
return weight >= randNum ? address : null;
}

_doSelect(request, addressList) {
let address;
let count = this.size;
while (count--) {
address = this._rr(request, addressList);
if (address) return address;
}
// 直接返回兜底
return addressList[this._offset];
}
}

module.exports = WeightRoundRobinLoadBalancer;
Loading

0 comments on commit 4608ddd

Please sign in to comment.