mirror of
https://github.com/hyperledger/fabric-samples.git
synced 2026-06-17 07:25:10 +00:00
FAB-6400 Balance-transfer filtered events
Update sample code to use the channel-based events. The sample will also use the new connection profile API to get a list of channel-based NodeSDK event hubs using filtered blocks, automatic unregistration, and automatic disconnect, all new features of channel-based events. This will demostrate the most common use case for events. The sample code will require the NodeSDK to be at 1.1 alpha. Change-Id: Id9f2b37f02d7d662b7ca1016586560ee4c595992 Signed-off-by: Bret Harrison <beharrison@nc.rr.com>
This commit is contained in:
parent
831e9bfe80
commit
ffd7a2576c
3 changed files with 41 additions and 50 deletions
|
|
@ -25,7 +25,7 @@ var instantiateChaincode = async function(peers, channelName, chaincodeName, cha
|
|||
logger.debug('\n\n============ Instantiate chaincode on channel ' + channelName +
|
||||
' ============\n');
|
||||
var error_message = null;
|
||||
var eventhubs_in_use = [];
|
||||
|
||||
try {
|
||||
// first setup the client for this org
|
||||
var client = await helper.getClientForOrg(org_name, username);
|
||||
|
|
@ -84,27 +84,25 @@ var instantiateChaincode = async function(peers, channelName, chaincodeName, cha
|
|||
logger.info(util.format(
|
||||
'Successfully sent Proposal and received ProposalResponse: Status - %s, message - "%s", metadata - "%s", endorsement signature: %s',
|
||||
proposalResponses[0].response.status, proposalResponses[0].response.message,
|
||||
proposalResponses[0].response.payload, proposalResponses[0].endorsement
|
||||
.signature));
|
||||
proposalResponses[0].response.payload, proposalResponses[0].endorsement.signature));
|
||||
|
||||
// tell each peer to join and wait for the event hub of each peer to tell us
|
||||
// that the channel has been created on each peer
|
||||
// wait for the channel-based event hub to tell us that the
|
||||
// instantiate transaction was committed on the peer
|
||||
var promises = [];
|
||||
let event_hubs = client.getEventHubsForOrg(org_name);
|
||||
let event_hubs = channel.getChannelEventHubsForOrg();
|
||||
logger.debug('found %s eventhubs for this organization %s',event_hubs.length, org_name);
|
||||
event_hubs.forEach((eh) => {
|
||||
let instantiateEventPromise = new Promise((resolve, reject) => {
|
||||
logger.debug('instantiateEventPromise - setting up event');
|
||||
let event_timeout = setTimeout(() => {
|
||||
let message = 'REQUEST_TIMEOUT:' + eh._ep._endpoint.addr;
|
||||
let message = 'REQUEST_TIMEOUT:' + eh.getPeerAddr();
|
||||
logger.error(message);
|
||||
eh.disconnect();
|
||||
reject(new Error(message));
|
||||
}, 60000);
|
||||
eh.registerTxEvent(deployId, (tx, code) => {
|
||||
logger.info('The chaincode instantiate transaction has been committed on peer %s',eh._ep._endpoint.addr);
|
||||
eh.registerTxEvent(deployId, (tx, code, block_num) => {
|
||||
logger.info('The chaincode instantiate transaction has been committed on peer %s',eh.getPeerAddr());
|
||||
logger.info('Transaction %s has status of %s in blocl %s', tx, code, block_num);
|
||||
clearTimeout(event_timeout);
|
||||
eh.unregisterTxEvent(deployId);
|
||||
|
||||
if (code !== 'VALID') {
|
||||
let message = until.format('The chaincode instantiate transaction was invalid, code:%s',code);
|
||||
|
|
@ -117,15 +115,18 @@ var instantiateChaincode = async function(peers, channelName, chaincodeName, cha
|
|||
}
|
||||
}, (err) => {
|
||||
clearTimeout(event_timeout);
|
||||
eh.unregisterTxEvent(deployId);
|
||||
let message = 'Problem setting up the event hub :'+ err.toString();
|
||||
logger.error(message);
|
||||
reject(new Error(message));
|
||||
});
|
||||
logger.error(err);
|
||||
reject(err);
|
||||
},
|
||||
// the default for 'unregister' is true for transaction listeners
|
||||
// so no real need to set here, however for 'disconnect'
|
||||
// the default is false as most event hubs are long running
|
||||
// in this use case we are using it only once
|
||||
{unregister: true, disconnect: true}
|
||||
);
|
||||
eh.connect();
|
||||
});
|
||||
promises.push(instantiateEventPromise);
|
||||
eh.connect();
|
||||
eventhubs_in_use.push(eh);
|
||||
});
|
||||
|
||||
var orderer_request = {
|
||||
|
|
@ -155,7 +156,7 @@ var instantiateChaincode = async function(peers, channelName, chaincodeName, cha
|
|||
for(let i in results) {
|
||||
let event_hub_result = results[i];
|
||||
let event_hub = event_hubs[i];
|
||||
logger.debug('Event results for event hub :%s',event_hub._ep._endpoint.addr);
|
||||
logger.debug('Event results for event hub :%s',event_hub.getPeerAddr());
|
||||
if(typeof event_hub_result === 'string') {
|
||||
logger.debug(event_hub_result);
|
||||
} else {
|
||||
|
|
@ -172,11 +173,6 @@ var instantiateChaincode = async function(peers, channelName, chaincodeName, cha
|
|||
error_message = error.toString();
|
||||
}
|
||||
|
||||
// need to shutdown open event streams
|
||||
eventhubs_in_use.forEach((eh) => {
|
||||
eh.disconnect();
|
||||
});
|
||||
|
||||
if (!error_message) {
|
||||
let message = util.format(
|
||||
'Successfully instantiate chaingcode in organization %s to the channel \'%s\'',
|
||||
|
|
|
|||
|
|
@ -24,7 +24,6 @@ var logger = helper.getLogger('invoke-chaincode');
|
|||
var invokeChaincode = async function(peerNames, channelName, chaincodeName, fcn, args, username, org_name) {
|
||||
logger.debug(util.format('\n============ invoke transaction on channel %s ============\n', channelName));
|
||||
var error_message = null;
|
||||
var eventhubs_in_use = [];
|
||||
var tx_id_string = null;
|
||||
try {
|
||||
// first setup the client for this org
|
||||
|
|
@ -78,26 +77,24 @@ var invokeChaincode = async function(peerNames, channelName, chaincodeName, fcn,
|
|||
logger.info(util.format(
|
||||
'Successfully sent Proposal and received ProposalResponse: Status - %s, message - "%s", metadata - "%s", endorsement signature: %s',
|
||||
proposalResponses[0].response.status, proposalResponses[0].response.message,
|
||||
proposalResponses[0].response.payload, proposalResponses[0].endorsement
|
||||
.signature));
|
||||
proposalResponses[0].response.payload, proposalResponses[0].endorsement.signature));
|
||||
|
||||
// tell each peer to join and wait for the event hub of each peer to tell us
|
||||
// that the channel has been created on each peer
|
||||
// wait for the channel-based event hub to tell us
|
||||
// that the commit was good or bad on each peer in our organization
|
||||
var promises = [];
|
||||
let event_hubs = client.getEventHubsForOrg(org_name);
|
||||
let event_hubs = channel.getChannelEventHubsForOrg();
|
||||
event_hubs.forEach((eh) => {
|
||||
logger.debug('invokeEventPromise - setting up event');
|
||||
let invokeEventPromise = new Promise((resolve, reject) => {
|
||||
let event_timeout = setTimeout(() => {
|
||||
let message = 'REQUEST_TIMEOUT:' + eh._ep._endpoint.addr;
|
||||
let message = 'REQUEST_TIMEOUT:' + eh.getPeerAddr();
|
||||
logger.error(message);
|
||||
eh.disconnect();
|
||||
reject(new Error(message));
|
||||
}, 3000);
|
||||
eh.registerTxEvent(tx_id_string, (tx, code) => {
|
||||
logger.info('The chaincode invoke chaincode transaction has been committed on peer %s',eh._ep._endpoint.addr);
|
||||
eh.registerTxEvent(tx_id_string, (tx, code, block_num) => {
|
||||
logger.info('The chaincode invoke chaincode transaction has been committed on peer %s',eh.getPeerAddr());
|
||||
logger.info('Transaction %s has status of %s in blocl %s', tx, code, block_num);
|
||||
clearTimeout(event_timeout);
|
||||
eh.unregisterTxEvent(tx_id_string);
|
||||
|
||||
if (code !== 'VALID') {
|
||||
let message = util.format('The invoke chaincode transaction was invalid, code:%s',code);
|
||||
|
|
@ -110,15 +107,18 @@ var invokeChaincode = async function(peerNames, channelName, chaincodeName, fcn,
|
|||
}
|
||||
}, (err) => {
|
||||
clearTimeout(event_timeout);
|
||||
eh.unregisterTxEvent(tx_id_string);
|
||||
let message = 'Problem setting up the event hub :'+ err.toString();
|
||||
logger.error(message);
|
||||
reject(new Error(message));
|
||||
});
|
||||
logger.error(err);
|
||||
reject(err);
|
||||
},
|
||||
// the default for 'unregister' is true for transaction listeners
|
||||
// so no real need to set here, however for 'disconnect'
|
||||
// the default is false as most event hubs are long running
|
||||
// in this use case we are using it only once
|
||||
{unregister: true, disconnect: true}
|
||||
);
|
||||
eh.connect();
|
||||
});
|
||||
promises.push(invokeEventPromise);
|
||||
eh.connect();
|
||||
eventhubs_in_use.push(eh);
|
||||
});
|
||||
|
||||
var orderer_request = {
|
||||
|
|
@ -144,7 +144,7 @@ var invokeChaincode = async function(peerNames, channelName, chaincodeName, fcn,
|
|||
for(let i in results) {
|
||||
let event_hub_result = results[i];
|
||||
let event_hub = event_hubs[i];
|
||||
logger.debug('Event results for event hub :%s',event_hub._ep._endpoint.addr);
|
||||
logger.debug('Event results for event hub :%s',event_hub.getPeerAddr());
|
||||
if(typeof event_hub_result === 'string') {
|
||||
logger.debug(event_hub_result);
|
||||
} else {
|
||||
|
|
@ -161,11 +161,6 @@ var invokeChaincode = async function(peerNames, channelName, chaincodeName, fcn,
|
|||
error_message = error.toString();
|
||||
}
|
||||
|
||||
// need to shutdown open event streams
|
||||
eventhubs_in_use.forEach((eh) => {
|
||||
eh.disconnect();
|
||||
});
|
||||
|
||||
if (!error_message) {
|
||||
let message = util.format(
|
||||
'Successfully invoked the chaincode %s to the channel \'%s\' for transaction ID: %s',
|
||||
|
|
|
|||
|
|
@ -12,8 +12,8 @@
|
|||
"v1.0 fabric nodesdk sample"
|
||||
],
|
||||
"engines": {
|
||||
"node": ">=6.9.5 <7.0",
|
||||
"npm": ">=3.10.10 <4.0"
|
||||
"node": ">=8.9.4 <9.0",
|
||||
"npm": ">=5.6.0 <6.0"
|
||||
},
|
||||
"license": "Apache-2.0",
|
||||
"dependencies": {
|
||||
|
|
|
|||
Loading…
Reference in a new issue