Skip to content

Commit

Permalink
BE-876 Stop unnecessary discovery request (#255)
Browse files Browse the repository at this point in the history
Explorer was making multiple `sendDiscoveryRequest` calls, 1 per block,
which retrieves the same information.

Signed-off-by: Atsushi Neki <nekiaiken@gmail.com>
  • Loading branch information
nekia authored Jul 4, 2021
1 parent fef5460 commit ae60298
Show file tree
Hide file tree
Showing 6 changed files with 4,506 additions and 29 deletions.
7 changes: 6 additions & 1 deletion app/platform/fabric/sync/FabricEvent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,12 @@ export class FabricEvent {
async event => {
// Skip first block, it is process by peer event hub
if (!(event.blockNumber.low === 0 && event.blockNumber.high === 0)) {
await this.fabricServices.processBlockEvent(this.client, event.blockData);
const noDiscovery = false;
await this.fabricServices.processBlockEvent(
this.client,
event.blockData,
noDiscovery
);
}
},
{
Expand Down
13 changes: 7 additions & 6 deletions app/platform/fabric/sync/SyncPlatform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,11 @@ export class SyncPlatform {
* Setting interval for validating any missing block from the current client ledger
* Set blocksSyncTime property in platform config.json in minutes
*/
(function validateMissingBlocks(sync) {
sync.isChannelEventHubConnected();
setTimeout(validateMissingBlocks, sync.blocksSyncTime, sync);
})(this);
// During initial sync-up phase, disable discovery request
(function validateMissingBlocks(sync: SyncPlatform, noDiscovery: boolean) {
sync.isChannelEventHubConnected(noDiscovery);
setTimeout(validateMissingBlocks, sync.blocksSyncTime, sync, false);
})(this, true);

logger.debug(
'******* Initialization end for child client process %s ******',
Expand All @@ -133,12 +134,12 @@ export class SyncPlatform {
*
* @memberof SyncPlatform
*/
async isChannelEventHubConnected() {
async isChannelEventHubConnected(noDiscovery: boolean) {
for (const channel_name of this.client.getChannels()) {
// Validate channel event is connected
const status = this.eventHub.isChannelEventHubConnected(channel_name);
if (status) {
await this.syncService.syncBlocks(this.client, channel_name);
await this.syncService.syncBlocks(this.client, channel_name, noDiscovery);
} else {
// Channel client is not connected then it will reconnect
this.eventHub.connectChannelEventHub(channel_name);
Expand Down
30 changes: 16 additions & 14 deletions app/platform/fabric/sync/SyncService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -357,18 +357,16 @@ export class SyncServices {
.saveChaincodPeerRef(network_id, chaincode_peer_row);
}

async syncBlocks(client, channel_name) {
async syncBlocks(client, channel_name, noDiscovery) {
const network_id = client.getNetworkId();

// Get channel information from ledger
const channelInfo = await client.fabricGateway.queryChainInfo(channel_name);
// Get channel information from ledger
const channelInfo = await client.fabricGateway.queryChainInfo(channel_name);

if (!channelInfo) {
logger.info(
`syncBlocks: Failed to retrieve channelInfo >> ${channel_name}`,
);
return;
}
if (!channelInfo) {
logger.info(`syncBlocks: Failed to retrieve channelInfo >> ${channel_name}`);
return;
}
const synch_key = `${network_id}_${channel_name}`;
logger.info(`syncBlocks: Start >> ${synch_key}`);
if (this.synchInProcess.includes(synch_key)) {
Expand All @@ -393,7 +391,7 @@ export class SyncServices {
result.missing_id
);
if (block) {
await this.processBlockEvent(client, block);
await this.processBlockEvent(client, block, noDiscovery);
}
} catch {
logger.error(`Failed to process Block # ${result.missing_id}`);
Expand All @@ -410,7 +408,6 @@ export class SyncServices {
async updateDiscoveredChannel(client, channel_name, channel_genesis_hash) {
const network_id = client.getNetworkId();
// get discovery and insert new peer, orders details to db
await client.initializeChannelFromDiscover(channel_name);
await this.insertFromDiscoveryResults(
client,
channel_name,
Expand Down Expand Up @@ -459,7 +456,7 @@ export class SyncServices {
* @returns
* @memberof SyncServices
*/
async processBlockEvent(client, block) {
async processBlockEvent(client, block, noDiscovery) {
const network_id = client.getNetworkId();
// Get the first transaction
const first_tx = block.data.data[0];
Expand Down Expand Up @@ -492,7 +489,10 @@ export class SyncServices {
}
this.blocksInProcess.push(blockPro_key);

if (header.channel_header.typeString === fabric_const.BLOCK_TYPE_CONFIG) {
if (
!noDiscovery &&
header.channel_header.typeString === fabric_const.BLOCK_TYPE_CONFIG
) {
setTimeout(
async (cli, chName, chGenHash) => {
await this.updateDiscoveredChannel(cli, chName, chGenHash);
Expand Down Expand Up @@ -629,9 +629,11 @@ export class SyncServices {
const chaincode_id = String.fromCharCode.apply(null, chaincodeID);
// checking new chaincode is deployed
if (
!noDiscovery &&
header.channel_header.typeString ===
fabric_const.BLOCK_TYPE_ENDORSER_TRANSACTION &&
chaincode === fabric_const.CHAINCODE_LSCC
(chaincode === fabric_const.CHAINCODE_LSCC ||
chaincode === fabric_const.CHAINCODE_LIFECYCLE)
) {
setTimeout(
async (cli, chName, chGenHash) => {
Expand Down
1 change: 1 addition & 0 deletions app/platform/fabric/utils/FabricConst.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ export const fabric = {
BLOCK_TYPE_CONFIG: 'CONFIG',
BLOCK_TYPE_ENDORSER_TRANSACTION: 'ENDORSER_TRANSACTION',
CHAINCODE_LSCC: 'lscc',
CHAINCODE_LIFECYCLE: '_lifecycle',
NOTITY_TYPE_NEWCHANNEL: '1',
NOTITY_TYPE_UPDATECHANNEL: '2',
NOTITY_TYPE_CHAINCODE: '3',
Expand Down
24 changes: 16 additions & 8 deletions app/test/SyncService.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import sinon from 'sinon';
import { SyncServices } from '../platform/fabric/sync/SyncService';
import * as stubBlock from './block.json';
import * as stubConfigBlock from './block_config.json';
import * as stubLifecycleBlock from './block_lifecycle.json';

import { ExplorerError } from '../common/ExplorerError';
import * as FabricConst from '../platform/fabric/utils/FabricConst';
Expand Down Expand Up @@ -128,8 +129,8 @@ describe('processBlockEvent', () => {
it('should return without error', async () => {
const stubClient = setupClient();

await expect(sync.processBlockEvent(stubClient, stubBlock)).eventually.to.be
.true;
await expect(sync.processBlockEvent(stubClient, stubBlock, false)).eventually
.to.be.true;
sinon.assert.calledOnce(stubPlatform.send);
sinon.assert.calledWith(
stubPlatform.send,
Expand All @@ -142,7 +143,7 @@ describe('processBlockEvent', () => {
const stubClient = setupClient();
sync.blocksInProcess = ['mychannel_9'];

await expect(sync.processBlockEvent(stubClient, stubBlock))
await expect(sync.processBlockEvent(stubClient, stubBlock, false))
.to.eventually.be.rejectedWith('Block already in processing')
.and.be.an.instanceOf(ExplorerError);
sinon.assert.notCalled(stubPlatform.send);
Expand All @@ -158,7 +159,7 @@ describe('processBlockEvent', () => {
const spyInsertDiscoveredCH = sinon.spy(sync, 'insertDiscoveredChannel');

const clock = sinon.useFakeTimers();
await expect(sync.processBlockEvent(stubClient, stubBlock))
await expect(sync.processBlockEvent(stubClient, stubBlock, false))
.to.eventually.be.rejectedWith('mychannel has not been inserted yet')
.and.be.an.instanceOf(ExplorerError);
clock.tick(20000);
Expand All @@ -174,7 +175,7 @@ describe('processBlockEvent', () => {
const spyUpdateDiscoveredCH = sinon.spy(sync, 'updateDiscoveredChannel');

const clock = sinon.useFakeTimers();
await expect(sync.processBlockEvent(stubClient, stubConfigBlock)).to
await expect(sync.processBlockEvent(stubClient, stubConfigBlock, false)).to
.eventually.to.be.true;
clock.tick(20000);

Expand All @@ -199,7 +200,14 @@ describe('processBlockEvent', () => {
const stubClient = setupClient();

stubConfigBlock.data.data[0].payload.data.last_update.payload = null;
await expect(sync.processBlockEvent(stubClient, stubConfigBlock)).to
await expect(sync.processBlockEvent(stubClient, stubConfigBlock, false)).to
.eventually.to.be.true;
});

it('should be done without any errors when _lifecycle block is processed', async () => {
const stubClient = setupClient();

await expect(sync.processBlockEvent(stubClient, stubLifecycleBlock, false)).to
.eventually.to.be.true;
});
});
Expand All @@ -219,7 +227,7 @@ describe('syncBlocks', () => {
const stubClient = setupClient();
const stubProcessBlockEvent = sinon.stub(sync, 'processBlockEvent');

await sync.syncBlocks(stubClient, VALID_CHANNEL_NAME);
await sync.syncBlocks(stubClient, VALID_CHANNEL_NAME, false);
expect(stubProcessBlockEvent.calledTwice).to.be.true;
stubProcessBlockEvent.restore();
});
Expand All @@ -230,7 +238,7 @@ describe('syncBlocks', () => {
stubProcessBlockEvent.onFirstCall().throws('Block already in processing');
stubError.reset();

await sync.syncBlocks(stubClient, VALID_CHANNEL_NAME);
await sync.syncBlocks(stubClient, VALID_CHANNEL_NAME, false);
expect(stubProcessBlockEvent.calledTwice).to.be.true;
expect(stubError.calledWith('Failed to process Block # 1')).to.be.true;
expect(stubError.calledWith('Failed to process Block # 2')).to.be.false;
Expand Down
Loading

0 comments on commit ae60298

Please sign in to comment.