Skip to content

Commit

Permalink
fix: dubbo registry issue (#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
gxcsoccer authored Oct 31, 2018
1 parent 9f5ef1b commit 8355941
Show file tree
Hide file tree
Showing 13 changed files with 220 additions and 16 deletions.
12 changes: 9 additions & 3 deletions example/dubbo.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,21 @@ async function invoke() {
logger,
registry,
protocol,
group: 'dubbo',
version: null,
group: 'HSF',
version: '1.0.0',
});
const consumer = client.createConsumer({
interfaceName: 'org.apache.dubbo.demo.DemoService',
});
await consumer.ready();

const result = await consumer.invoke('sayHello', [{
let result = await consumer.invoke('sayHello', [{
$class: 'java.lang.String',
$: 'zongyu',
}], { responseTimeout: 3000 });
console.log(result);

result = await consumer.invoke('sayHello', [{
$class: 'java.lang.String',
$: 'zongyu',
}], { responseTimeout: 3000 });
Expand Down
33 changes: 33 additions & 0 deletions example/dubbo_server.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
'use strict';

const { RpcServer } = require('../').server;
const { ZookeeperRegistry } = require('../').registry;
const protocol = require('dubbo-remoting');
const logger = console;

const registry = new ZookeeperRegistry({
logger,
address: '127.0.0.1:2181/dubbo/',
});

const server = new RpcServer({
logger,
registry,
port: 12200,
group: 'HSF',
protocol,
version: '1.0.0',
});

server.addService({
interfaceName: 'org.apache.dubbo.demo.DemoService',
}, {
async sayHello(name) {
return 'hello ' + name;
},
});

server.start()
.then(() => {
server.publish();
});
1 change: 1 addition & 0 deletions lib/client/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ class RpcConsumer extends Base {
return new RpcRequest({
targetAppName: this.targetAppName,
serverSignature: this.id,
group: this.group,
methodName: method,
args,
requestProps: options.requestProps,
Expand Down
1 change: 1 addition & 0 deletions lib/client/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class RpcRequest {

this.targetAppName = data.targetAppName;
this.serverSignature = data.serverSignature;
this.group = data.group;
this.methodName = data.methodName;
this.args = data.args;
this.timeout = data.timeout;
Expand Down
34 changes: 33 additions & 1 deletion lib/registry/zk/data_client.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
'use strict';

const { URL } = require('url');
const assert = require('assert');
const Base = require('sdk-base');
const fmt = require('util').format;
Expand Down Expand Up @@ -84,7 +85,11 @@ class ZookeeperRegistry extends Base {
this.emit('error', err);
return;
}
const addressList = children.map(url => urlencode.decode(url));
const originAddressList = children.map(url => urlencode.decode(url));
const addressList = originAddressList.filter(url => this._isMatch(config, url));
this.logger.info('[ZookeeperRegistry] receive interface:%s:%s@%s address list (%d):\n%s\nvalid providers (%d):\n%s',
config.interfaceName, config.version || '', config.group || '',
originAddressList.length, formatAddrs(originAddressList), addressList.length, formatAddrs(addressList));
this._subscribeMap.set(interfaceName, addressList);
this.emit(interfaceName, addressList);
});
Expand Down Expand Up @@ -136,9 +141,36 @@ class ZookeeperRegistry extends Base {
return this._rootPath + config.interfaceName + '/consumers';
}

_isMatch(consumer, urlStr) {
const url = new URL(urlStr);
const providerInfo = url.searchParams || {};
const interfaceName = providerInfo.get('interface') || url.pathname.slice(1);
if (interfaceName && consumer.interfaceName !== interfaceName) {
return false;
}
const category = providerInfo.get('category');
if (category && category !== 'providers') {
return false;
}
const enabled = providerInfo.get('enabled');
if (enabled && enabled !== 'true') {
return false;
}
const consumerGroup = consumer.group;
const consumerVersion = consumer.version;
const providerGroup = providerInfo.get('group');
const providerVersion = providerInfo.get('version');
return (!consumerGroup || consumerGroup === providerGroup) &&
(!consumerVersion || consumerVersion === providerVersion);
}

close() {
return this._zkClient.close();
}
}

function formatAddrs(addrs) {
return addrs.map(addr => ' - ' + addr).join('\n');
}

module.exports = ZookeeperRegistry;
4 changes: 2 additions & 2 deletions lib/server/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,9 @@ class RpcServer extends Base {
}

get url() {
const type = this.protocol.name || 'bolt';
// uniqueId=&version=1.0&timeout=0&delay=-1&id=rpc-cfg-0&dynamic=true&weight=100&accepts=100000&startTime=1526050447423&pid=13862&language=java&rpcVer=50400
return 'bolt://' + this.publishAddress + ':' + this.publishPort + '?' + qs.stringify(this.params);
return type + '://' + this.publishAddress + ':' + this.publishPort + '?' + qs.stringify(this.params);
}

get params() {
Expand All @@ -107,7 +108,6 @@ class RpcServer extends Base {
uniqueId: '',
dynamic: 'true',
appName: this.options.appName,
version: this.options.version,
timeout: this.options.responseTimeout,
serialization: this.options.codecType,
weight: 100,
Expand Down
14 changes: 10 additions & 4 deletions lib/server/service.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
const is = require('is-type-of');
const Base = require('sdk-base');
const assert = require('assert');
const { URL } = require('url');

class RpcService extends Base {
constructor(options = {}) {
Expand Down Expand Up @@ -46,13 +47,18 @@ class RpcService extends Base {
return this.options.registry;
}

normalizeReg(url) {
return {
normalizeReg(urlStr) {
const url = new URL(urlStr);
url.searchParams.set('interface', this.interfaceName);
url.searchParams.set('version', this.version);
url.searchParams.set('group', this.group);
const reg = {
interfaceName: this.interfaceName,
version: this.version,
group: this.group,
url,
url: url.toString(),
};
return reg;
}

/**
Expand All @@ -65,7 +71,7 @@ class RpcService extends Base {
if (!this.registry) return Promise.resolve();

const reg = this.normalizeReg(url);
this.publishUrl = url;
this.publishUrl = reg.url;
return this.registry.register(reg);
}

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
"await-event": "^2.1.0",
"coffee": "^5.1.0",
"contributors": "^0.5.1",
"dubbo-remoting": "^2.1.2",
"dubbo-remoting": "^2.1.3",
"egg-bin": "^4.9.0",
"eslint": "^5.8.0",
"eslint-config-egg": "^7.1.0",
Expand Down
2 changes: 1 addition & 1 deletion test/client/fault_retry.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ describe('test/client/fault_retry.test.js', () => {

setTimeout(() => {
addressGroup.emit('timeout');
}, 9000);
}, 8000);

const o = await addressGroup.awaitFirst([ 'retry', 'timeout' ]);
assert(o.event === 'timeout');
Expand Down
4 changes: 2 additions & 2 deletions test/client/loadbalancer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ describe('test/client/loadbalancer.test.js', () => {
}
const per = 100000 / count;
for (let i = 0; i < 20; i++) {
assert(per * (i + 1) * 0.85 < cnt.get(9000 + i) &&
per * (i + 1) * 1.15 > cnt.get(9000 + i)); // 随机偏差不会太大,应该不超过15%
assert(per * (i + 1) * 0.80 < cnt.get(9000 + i) &&
per * (i + 1) * 1.20 > cnt.get(9000 + i)); // 随机偏差不会太大,应该不超过20%
}
});
});
Expand Down
112 changes: 112 additions & 0 deletions test/registry/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -137,5 +137,117 @@ describe('test/registry/index.test.js', () => {
assert(addressList && addressList.length === 1);
assert(addressList[0] === 'bolt://127.0.0.1:12200');
});

it('should filter with version & group', async () => {
const reg1 = {
interfaceName: 'org.apache.dubbo.demo.DemoService',
version: '1.0.0',
group: 'HSF',
url: 'dubbo://127.0.0.1:12200/org.apache.dubbo.demo.DemoService?accepts=100000&appName=&application=demo-consumer&check=false&dubbo=2.0.2&dynamic=true&group=HSF&interface=org.apache.dubbo.demo.DemoService&language=nodejs&methods=sayHello&pid=45510&qos.port=33333&register.ip=192.168.1.13&revision=1.0.0&rpcVer=50400&serialization=hessian2&side=consumer&startTime=1540925808939&timeout=3000&timestamp=1540925836963&uniqueId=&version=1.0.0&weight=100',
};
const reg2 = {
interfaceName: 'org.apache.dubbo.demo.DemoService',
version: '1.0.0.test',
group: 'HSF',
url: 'dubbo://127.0.0.2:12200?uniqueId=&version=1.0.0.test&timeout=0&delay=-1&id=rpc-cfg-0&dynamic=true&weight=100&accepts=100000&startTime=1526050447423&pid=13862&language=java&rpcVer=50400&group=HSF&interface=org.apache.dubbo.demo.DemoService',
};
const reg3 = {
interfaceName: 'org.apache.dubbo.demo.DemoService',
version: '1.0.0',
group: 'SOFA',
url: 'dubbo://127.0.0.3:12200?uniqueId=&version=1.0&timeout=0&delay=-1&id=rpc-cfg-0&dynamic=true&weight=100&accepts=100000&startTime=1526050447423&pid=13862&language=java&rpcVer=50400&group=SOFA&interface=org.apache.dubbo.demo.DemoService',
};
registry.register(reg1);
registry.register(reg2);
registry.register(reg3);

await sleep(2000);

let addressList;
const listener = val => {
addressList = val;
registry.emit('address_update', val);
};
registry.subscribe({
interfaceName: 'org.apache.dubbo.demo.DemoService',
version: '1.0.0',
group: 'HSF',
}, listener);

await registry.await('address_update');

registry.unSubscribe({
interfaceName: 'org.apache.dubbo.demo.DemoService',
version: '1.0.0',
group: 'HSF',
});

assert(addressList && addressList.length === 1);
assert(addressList[0].startsWith('dubbo://127.0.0.1:12200'));

await registry.unRegister(reg1);
await registry.unRegister(reg2);
await registry.unRegister(reg3);
});

it('should filter with interfaceName', async () => {
const reg1 = {
interfaceName: 'org.apache.dubbo.demo.DemoService',
version: '1.0.0',
group: 'HSF',
url: 'dubbo://127.0.0.1:12200/org.apache.dubbo.demo.DemoService?accepts=100000&appName=&application=demo-consumer&check=false&dubbo=2.0.2&dynamic=true&group=HSF&interface=org.apache.dubbo.demo.DemoService&language=nodejs&methods=sayHello&pid=45510&qos.port=33333&register.ip=192.168.1.13&revision=1.0.0&rpcVer=50400&serialization=hessian2&side=consumer&startTime=1540925808939&timeout=3000&timestamp=1540925836963&uniqueId=&version=1.0.0&weight=100',
};
const reg2 = {
interfaceName: 'org.apache.dubbo.demo.DemoService',
version: '1.0.0',
group: 'HSF',
url: 'dubbo://127.0.0.2:12200?uniqueId=&version=1.0.0.test&timeout=0&delay=-1&id=rpc-cfg-0&dynamic=true&weight=100&accepts=100000&startTime=1526050447423&pid=13862&language=java&rpcVer=50400&group=HSF&interface=org.apache.dubbo.demo.HelloService',
};
const reg3 = {
interfaceName: 'org.apache.dubbo.demo.DemoService',
version: '1.0.0',
group: 'HSF',
url: 'dubbo://127.0.0.3:12200?uniqueId=&version=1.0&timeout=0&delay=-1&id=rpc-cfg-0&dynamic=true&weight=100&accepts=100000&startTime=1526050447423&pid=13862&language=java&rpcVer=50400&group=HSF&interface=org.apache.dubbo.demo.DemoService&category=routers',
};
const reg4 = {
interfaceName: 'org.apache.dubbo.demo.DemoService',
version: '1.0.0',
group: 'HSF',
url: 'dubbo://127.0.0.4:12200?uniqueId=&version=1.0&timeout=0&delay=-1&id=rpc-cfg-0&dynamic=true&weight=100&accepts=100000&startTime=1526050447423&pid=13862&language=java&rpcVer=50400&group=HSF&interface=org.apache.dubbo.demo.DemoService&enabled=false',
};
registry.register(reg1);
registry.register(reg2);
registry.register(reg3);
registry.register(reg4);

await sleep(2000);

let addressList;
const listener = val => {
addressList = val;
registry.emit('address_update', val);
};
registry.subscribe({
interfaceName: 'org.apache.dubbo.demo.DemoService',
version: '1.0.0',
group: 'HSF',
}, listener);

await registry.await('address_update');

registry.unSubscribe({
interfaceName: 'org.apache.dubbo.demo.DemoService',
version: '1.0.0',
group: 'HSF',
});

assert(addressList && addressList.length === 1);
assert(addressList[0].startsWith('dubbo://127.0.0.1:12200'));

await registry.unRegister(reg1);
await registry.unRegister(reg2);
await registry.unRegister(reg3);
await registry.unRegister(reg4);
});
});
});
15 changes: 14 additions & 1 deletion test/server/server.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const mm = require('mm');
const assert = require('assert');
const sleep = require('mz-modules/sleep');
const request = require('../../').test;
const dubboProtocol = require('dubbo-remoting');
const RpcClient = require('../../').client.RpcClient;
const RpcServer = require('../../').server.RpcServer;
const ZookeeperRegistry = require('../../').registry.ZookeeperRegistry;
Expand Down Expand Up @@ -43,7 +44,19 @@ describe('test/server.test.js', () => {
logger,
codecType: 'protobuf',
});
assert(server.url.endsWith('dynamic=true&appName=test&version=1.0&timeout=3000&serialization=protobuf&weight=100&accepts=100000&language=nodejs&rpcVer=50400'));
assert(server.url.endsWith('dynamic=true&appName=test&timeout=3000&serialization=protobuf&weight=100&accepts=100000&language=nodejs&rpcVer=50400'));
await server.close();
});

it('should format url with property protocol type', async function() {
server = new RpcServer({
appName: 'test',
protocol: dubboProtocol,
logger,
codecType: 'hessian2',
});
assert(server.url.startsWith('dubbo://'));
assert(server.url.endsWith('dynamic=true&appName=test&timeout=3000&serialization=hessian2&weight=100&accepts=100000&language=nodejs&rpcVer=50400'));
await server.close();
});

Expand Down
2 changes: 1 addition & 1 deletion test/server/service.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ describe('test/server/service.test.js', () => {
interfaceName: 'com.node.test.TestService',
version: '1.0',
group: 'SOFA',
url: 'bolt://127.0.0.1:12200',
url: 'bolt://127.0.0.1:12200?interface=com.node.test.TestService&version=1.0&group=SOFA',
});

await service.publish('bolt://127.0.0.1:12200');
Expand Down

0 comments on commit 8355941

Please sign in to comment.