Skip to content

Commit

Permalink
Merge "node-SDK: add support for fabric events(block, chaincode, tran…
Browse files Browse the repository at this point in the history
…sactional)"
  • Loading branch information
christo4ferris authored and Gerrit Code Review committed Sep 13, 2016
2 parents 9bf95d0 + e9d3ac2 commit 2c890dd
Show file tree
Hide file tree
Showing 3 changed files with 229 additions and 41 deletions.
2 changes: 2 additions & 0 deletions sdk/node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
"crypto": "0.0.3",
"debug": "^2.2.0",
"elliptic": "^6.2.3",
"es6-set": "^0.1.4",
"events": "^1.1.0",
"fs": "0.0.2",
"grpc": "^0.13.2-pre1",
"hashtable": "^2.0.2",
"js-sha3": "^0.5.1",
"json-stringify-safe": "^5.0.1",
"jsrsasign": "^5.0.10",
Expand Down
262 changes: 221 additions & 41 deletions sdk/node/src/hfc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ var jsrsa = require('jsrsasign');
var elliptic = require('elliptic');
var sha3 = require('js-sha3');
var BN = require('bn.js');
var Set = require('es6-set');
var HashTable = require('hashtable');
import * as crypto from "./crypto"
import * as stats from "./stats"
import * as sdk_util from "./sdk_util"
Expand Down Expand Up @@ -380,6 +382,7 @@ export interface TransactionProtobuf {
setConfidentialityProtocolVersion(version:string):void;
setNonce(nonce:Buffer):void;
setToValidators(Buffer):void;
getTxid():string;
getChaincodeID():{buffer: Buffer};
setChaincodeID(buffer:Buffer):void;
getMetadata():{buffer: Buffer};
Expand Down Expand Up @@ -430,6 +433,9 @@ export class Chain {
// The member services used for this chain
private memberServices:MemberServices;

// The eventHub service used for this chain
private eventHub:EventHub;

// The key-val store used for this chain
private keyValStore:KeyValStore;

Expand All @@ -441,14 +447,15 @@ export class Chain {

// Temporary variables to control how long to wait for deploy and invoke to complete before
// emitting events. This will be removed when the SDK is able to receive events from the
private deployWaitTime:number = 20;
private deployWaitTime:number = 30;
private invokeWaitTime:number = 5;

// The crypto primitives object
cryptoPrimitives:crypto.Crypto;

constructor(name:string) {
this.name = name;
this.eventHub = new EventHub();
}

/**
Expand Down Expand Up @@ -531,6 +538,29 @@ export class Chain {
}
};

/**
* Get the eventHub service associated this chain.
* @returns {eventHub} Return the current eventHub service, or undefined if not set.
*/
getEventHub():EventHub{
return this.eventHub;
};

/**
* Set and connect to the peer to be used as the event source.
*/
eventHubConnect(peeraddr: string):void {
this.eventHub.setPeerAddr(peeraddr);
this.eventHub.connect();
};

/**
* Set and connect to the peer to be used as the event source.
*/
eventHubDisconnect():void {
this.eventHub.disconnect();
};

/**
* Determine if security is enabled.
*/
Expand Down Expand Up @@ -1135,6 +1165,10 @@ export class TransactionContext extends events.EventEmitter {
private binding: any;
private tcert:TCert;
private attrs:string[];
private complete:boolean;
private timeoutId:any;
private waitTime:number;
private cevent:any;

constructor(member:Member, tcert:TCert) {
super();
Expand All @@ -1143,6 +1177,8 @@ export class TransactionContext extends events.EventEmitter {
this.memberServices = this.chain.getMemberServices();
this.tcert = tcert;
this.nonce = this.chain.cryptoPrimitives.generateNonce();
this.complete = false;
this.timeoutId = null;
}

/**
Expand Down Expand Up @@ -1367,7 +1403,47 @@ export class TransactionContext extends events.EventEmitter {
});
self.getChain().sendTransaction(tx, emitter);
} else {
self.getChain().sendTransaction(tx, self);
let txType = tx.pb.getType();
let uuid = tx.pb.getTxid();
let eh = self.getChain().getEventHub();
// async deploy and invokes need to maintain
// tx context(completion status(self.complete))
if ( txType == _fabricProto.Transaction.Type.CHAINCODE_DEPLOY) {
self.cevent = new EventDeployComplete(uuid, tx.chaincodeID);
self.waitTime = self.getChain().getDeployWaitTime();
} else if ( txType == _fabricProto.Transaction.Type.CHAINCODE_INVOKE) {
self.cevent = new EventInvokeComplete("Tx "+uuid+" complete");
self.waitTime = self.getChain().getInvokeWaitTime();
}
eh.registerTxEvent(uuid, function (uuid) {
self.complete = true;
if (self.timeoutId) {
clearTimeout(self.timeoutId);
}
eh.unregisterTxEvent(uuid);
self.emit("complete", self.cevent);
});
self.getChain().sendTransaction(tx, self);
// sync query can be skipped as response
// is processed and event generated in sendTransaction
// no timeout processing is necessary
if ( txType != _fabricProto.Transaction.Type.CHAINCODE_QUERY) {
debug("waiting %d seconds before emitting complete event", self.waitTime);
self.timeoutId = setTimeout(function() {
debug("timeout uuid=", uuid);
if(!self.complete)
// emit error if eventhub connect otherwise
// emit a complete event as done previously
if(eh.isconnected())
self.emit("error","timed out waiting for transaction to complete");
else
self.emit("complete",self.cevent);
else
eh.unregisterTxEvent(uuid);
},
self.waitTime * 1000
);
}
}
} else {
debug('Missing TCert...');
Expand Down Expand Up @@ -2121,7 +2197,6 @@ export class Peer {
let event = new EventDeploySubmitted(response.msg.toString(), tx.chaincodeID);
debug("EventDeploySubmitted event: %j", event);
eventEmitter.emit("submitted", event);
self.waitForDeployComplete(eventEmitter,event);
}
} else {
// Deploy completed with status "FAILURE" or "UNDEFINED"
Expand All @@ -2135,7 +2210,6 @@ export class Peer {
eventEmitter.emit("error", new EventTransactionError("the invoke response is missing the transaction UUID"));
} else {
eventEmitter.emit("submitted", new EventInvokeSubmitted(response.msg.toString()));
self.waitForInvokeComplete(eventEmitter);
}
} else {
// Invoke completed with status "FAILURE" or "UNDEFINED"
Expand All @@ -2157,43 +2231,6 @@ export class Peer {
});
};

/**
* TODO: Temporary hack to wait until the deploy event has hopefully completed.
* This does not detect if an error occurs in the peer or chaincode when deploying.
* When peer event listening is added to the SDK, this will be implemented correctly.
*/
private waitForDeployComplete(eventEmitter:events.EventEmitter, submitted:EventDeploySubmitted): void {
let waitTime = this.chain.getDeployWaitTime();
debug("waiting %d seconds before emitting deploy complete event",waitTime);
setTimeout(
function() {
let event = new EventDeployComplete(
submitted.uuid,
submitted.chaincodeID,
"TODO: get actual results; waited "+waitTime+" seconds and assumed deploy was successful"
);
eventEmitter.emit("complete",event);
},
waitTime * 1000
);
}

/**
* TODO: Temporary hack to wait until the deploy event has hopefully completed.
* This does not detect if an error occurs in the peer or chaincode when deploying.
* When peer event listening is added to the SDK, this will be implemented correctly.
*/
private waitForInvokeComplete(eventEmitter:events.EventEmitter): void {
let waitTime = this.chain.getInvokeWaitTime();
debug("waiting %d seconds before emitting invoke complete event",waitTime);
setTimeout(
function() {
eventEmitter.emit("complete",new EventInvokeComplete("waited "+waitTime+" seconds and assumed invoke was successful"));
},
waitTime * 1000
);
}

/**
* Remove the peer from the chain.
*/
Expand Down Expand Up @@ -2732,3 +2769,146 @@ export function getChain(chainName, create) {
export function newFileKeyValStore(dir:string):KeyValStore {
return new FileKeyValStore(dir);
}

/**
* The ChainCodeCBE is used internal to the EventHub to hold chaincode event registration callbacks.
*/
class ChainCodeCBE {
ccid: string;
eventname: string;
payload: Uint8Array;
cb: Function;
constructor(ccid: string,eventname: string,payload: Uint8Array, cb: Function) {
this.ccid = ccid;
this.eventname = eventname;
this.payload = payload;
this.cb = cb;
}
}

/**
* The EventHub is used to distribute events from a specific event source(peer)
*/
export class EventHub {
// peer addr to connect to
private peeraddr: string;
// grpc events interface
private events: any;
// grpc event client interface
private client: any;
// grpc chat streaming interface
private call: any;
// hashtable of clients registered for chaincode events
private chaincodeRegistrants: any;
// set of clients registered for block events
private blockRegistrants: any;
// hashtable of clients registered for transactional events
private txRegistrants: any;
// fabric connection state of this eventhub
private connected: boolean;
constructor() {
this.chaincodeRegistrants = new HashTable();
this.blockRegistrants = new Set();
this.txRegistrants = new HashTable();
this.peeraddr = "localhost:7053";
this.connected = false;
}

public setPeerAddr(peeraddr: string) {
this.peeraddr = peeraddr;
}

public isconnected() {
return this.connected;
}

public connect() {
if (this.connected) return;
this.events = grpc.load(__dirname + "/protos/events.proto" ).protos;
this.client = new this.events.Events(this.peeraddr,grpc.credentials.createInsecure());
this.call = this.client.chat();
this.connected = true;
this.registerBlockEvent(this.txCallback);

let eh = this; // for callback context
this.call.on('data', function(event) {
if ( event.Event == "chaincodeEvent" ) {
var cbe = eh.chaincodeRegistrants.get(event.chaincodeEvent.chaincodeID + "/" + event.chaincodeEvent.eventName);
if ( cbe ) {
cbe.payload = event.chaincodeEvent.payload;
cbe.cb(cbe);
}
} else if ( event.Event == "block") {
eh.blockRegistrants.forEach(function(cb){
cb(event.block);
});
}
});
this.call.on('end', function() {
eh.call.end();
// clean up Registrants - should app get notified?
eh.chaincodeRegistrants.clear();
eh.blockRegistrants.clear();
});
}

public disconnect() {
if (!this.connected) return;
this.unregisterBlockEvent(this.txCallback);
this.call.end();
this.connected = false;
}

public registerChaincodeEvent(ccid: string, eventname: string, callback: Function){
if (!this.connected) return;
let cb = new ChainCodeCBE(ccid, eventname, null, callback);
let register = { register: { events: [ { eventType: "CHAINCODE", chaincodeRegInfo:{ chaincodeID: ccid , eventName: eventname }} ] }};
this.chaincodeRegistrants.put(ccid + "/" + eventname, cb);
this.call.write(register);
}

public unregisterChaincodeEvent(ccid: string, eventname: string){
if (!this.connected) return;
var unregister = { unregister: { events: [ { eventType: "CHAINCODE", chaincodeRegInfo:{ chaincodeID: ccid, eventName: eventname }} ] }};
this.chaincodeRegistrants.remove(ccid + "/" + eventname);
this.call.write(unregister);
}

public registerBlockEvent(callback:Function){
if (!this.connected) return;
this.blockRegistrants.add(callback);
if(this.blockRegistrants.size==1) {
var register = { register: { events: [ { eventType: "BLOCK"} ] }};
this.call.write(register);
}
}

public unregisterBlockEvent(callback:Function){
if (!this.connected) return;
if(this.blockRegistrants.size<=1) {
var unregister = { unregister: { events: [ { eventType: "BLOCK"} ] }};
this.call.write(unregister);
}
this.blockRegistrants.delete(callback);
}

public registerTxEvent(txid:string, callback:Function){
debug("reg txid "+txid);
this.txRegistrants.put(txid, callback);
}

public unregisterTxEvent(txid:string){
this.txRegistrants.remove(txid);
}

private txCallback = (event) => {
debug("txCallback event=%j", event);
var eh = this;
event.transactions.forEach(function(transaction){
debug("transaction.txid="+transaction.txid);
var cb = eh.txRegistrants.get(transaction.txid);
if (cb)
cb(transaction.txid);
});
}
}
6 changes: 6 additions & 0 deletions sdk/node/test/unit/chain-tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ if (fs.existsSync("tlsca.cert")) {
chain.setMemberServicesUrl("grpc://localhost:7054");
}
chain.addPeer("grpc://localhost:7051");
chain.eventHubConnect("localhost:7053");

process.on('exit', function (){
chain.eventHubDisconnect();
});

//
// Set the chaincode deployment mode to either developent mode (user runs chaincode)
Expand Down Expand Up @@ -759,6 +764,7 @@ test('Invoke a chaincode by enrolled user', function (t) {
invokeTx.on('submitted', function (results) {
// Invoke transaction submitted successfully
t.pass(util.format("Successfully submitted chaincode invoke transaction: request=%j, response=%j", invokeRequest, results));
chain.eventHubDisconnect();
});
invokeTx.on('error', function (err) {
// Invoke transaction submission failed
Expand Down

0 comments on commit 2c890dd

Please sign in to comment.