Add block event listener config

Signed-off-by: James Taylor <jamest@uk.ibm.com>
This commit is contained in:
James Taylor 2021-09-10 15:53:51 +01:00
parent f1a9fea77d
commit 00a2dea50b
6 changed files with 158 additions and 57 deletions

View file

@ -152,6 +152,28 @@ describe('Config values', () => {
}); });
}); });
describe('blockListenerOrg', () => {
it('defaults to "Org1"', () => {
const config = require('./config');
expect(config.blockListenerOrg).toBe('Org1');
});
it('can be configured using the "HLF_BLOCK_LISTENER_ORG" environment variable', () => {
process.env.HLF_BLOCK_LISTENER_ORG = 'Org2';
const config = require('./config');
expect(config.blockListenerOrg).toBe('Org2');
});
it('throws an error when the "HLF_BLOCK_LISTENER_ORG" environment variable has an invalid value', () => {
process.env.HLF_BLOCK_LISTENER_ORG = 'Org3';
expect(() => {
require('./config');
}).toThrow(
'env-var: "HLF_BLOCK_LISTENER_ORG" should be one of [Org1, Org2]'
);
});
});
describe('channelName', () => { describe('channelName', () => {
it('defaults to "mychannel"', () => { it('defaults to "mychannel"', () => {
const config = require('./config'); const config = require('./config');

View file

@ -4,6 +4,9 @@
import * as env from 'env-var'; import * as env from 'env-var';
export const ORG1 = 'Org1';
export const ORG2 = 'Org2';
/* /*
* Log level for the REST server * Log level for the REST server
*/ */
@ -55,8 +58,8 @@ export const asLocalhost = env
*/ */
export const mspIdOrg1 = env export const mspIdOrg1 = env
.get('HLF_MSP_ID_ORG1') .get('HLF_MSP_ID_ORG1')
.default('Org1MSP') .default(`${ORG1}MSP`)
.example('Org1MSP') .example(`${ORG1}MSP`)
.asString(); .asString();
/* /*
@ -64,10 +67,18 @@ export const mspIdOrg1 = env
*/ */
export const mspIdOrg2 = env export const mspIdOrg2 = env
.get('HLF_MSP_ID_ORG2') .get('HLF_MSP_ID_ORG2')
.default('Org2MSP') .default(`${ORG2}MSP`)
.example('Org2MSP') .example(`${ORG2}MSP`)
.asString(); .asString();
/*
* The block listener org
*/
export const blockListenerOrg = env
.get('HLF_BLOCK_LISTENER_ORG')
.default(ORG1)
.asEnum([ORG1, ORG2]);
/* /*
* Name of the channel which the basic asset sample chaincode has been installed on * Name of the channel which the basic asset sample chaincode has been installed on
*/ */

View file

@ -11,6 +11,7 @@ import {
submitTransaction, submitTransaction,
getBlockHeight, getBlockHeight,
startRetryLoop, startRetryLoop,
blockEventHandler,
} from './fabric'; } from './fabric';
import * as config from './config'; import * as config from './config';
@ -22,11 +23,13 @@ import {
} from './errors'; } from './errors';
import { import {
BlockEvent,
Contract, Contract,
Gateway, Gateway,
GatewayOptions, GatewayOptions,
Network, Network,
Transaction, Transaction,
TransactionEvent,
Wallet, Wallet,
} from 'fabric-network'; } from 'fabric-network';
@ -40,6 +43,36 @@ jest.mock('./config');
jest.mock('ioredis', () => require('ioredis-mock/jest')); jest.mock('ioredis', () => require('ioredis-mock/jest'));
describe('Fabric', () => { describe('Fabric', () => {
const mockTransactionId =
'0ae62c01e4c4b112c3f3954a2f11243da76778e46df9ad2783bcbafc79652b95';
const mockKey = `txn:${mockTransactionId}`;
const mockMspId = 'Org1MSP';
const mockState = Buffer.from(
`{"name":"CreateAsset","nonce":"damqinq8nrI4n4qY8lFVsZw7RwG2ufrv","transactionId":${mockTransactionId}`
);
const mockArgs = '["test111","red",400,"Jean",101]';
const mockTimestamp = 1628078044362;
const addMockTransationDetails = async (redis: Redis) => {
await redis
.multi()
.hset(
mockKey,
'mspId',
mockMspId,
'state',
mockState,
'args',
mockArgs,
'timestamp',
mockTimestamp,
'retries',
'0'
)
.zadd('index:txn:timestamp', mockTimestamp, mockTransactionId)
.exec();
};
describe('createWallet', () => { describe('createWallet', () => {
it('creates a wallet containing identities for both orgs', async () => { it('creates a wallet containing identities for both orgs', async () => {
const wallet = await createWallet(); const wallet = await createWallet();
@ -110,41 +143,11 @@ describe('Fabric', () => {
let mockContract: MockProxy<Contract>; let mockContract: MockProxy<Contract>;
let mockContracts: Map<string, Contract>; let mockContracts: Map<string, Contract>;
const mockTransactionId =
'0ae62c01e4c4b112c3f3954a2f11243da76778e46df9ad2783bcbafc79652b95';
const mockKey = `txn:${mockTransactionId}`;
const mockMspId = 'Org1MSP';
const mockState = Buffer.from(
`{"name":"CreateAsset","nonce":"damqinq8nrI4n4qY8lFVsZw7RwG2ufrv","transactionId":${mockTransactionId}`
);
const mockArgs = '["test111","red",400,"Jean",101]';
const mockTimestamp = 1628078044362;
const flushPromises = () => { const flushPromises = () => {
jest.useRealTimers(); jest.useRealTimers();
return new Promise((resolve) => setImmediate(resolve)); return new Promise((resolve) => setImmediate(resolve));
}; };
const addMockTransationDetails = async (redis: Redis) => {
await redis
.multi()
.hset(
mockKey,
'mspId',
mockMspId,
'state',
mockState,
'args',
mockArgs,
'timestamp',
mockTimestamp,
'retries',
'0'
)
.zadd('index:txn:timestamp', mockTimestamp, mockTransactionId)
.exec();
};
beforeEach(() => { beforeEach(() => {
const redisOptions = { const redisOptions = {
port: config.redisPort, port: config.redisPort,
@ -485,6 +488,63 @@ describe('Fabric', () => {
}); });
}); });
describe('blockEventHandler', () => {
let redis: Redis;
let mockIsValidGetter: jest.Mock<boolean, []>;
let mockTransactionIdGetter: jest.Mock<string, []>;
let mockTransactionEvent: MockProxy<TransactionEvent>;
let mockBlockEvent: MockProxy<BlockEvent>;
beforeEach(async () => {
const redisOptions = {
port: config.redisPort,
host: config.redisHost,
username: config.redisUsername,
password: config.redisPassword,
};
redis = new IORedis(redisOptions) as unknown as Redis;
addMockTransationDetails(redis);
const baseMock = {};
mockTransactionEvent = mock<TransactionEvent>(baseMock);
mockIsValidGetter = jest.fn<boolean, []>();
Object.defineProperty(baseMock, 'isValid', { get: mockIsValidGetter });
mockTransactionIdGetter = jest.fn<string, []>();
Object.defineProperty(baseMock, 'transactionId', {
get: mockTransactionIdGetter,
});
mockBlockEvent = mock<BlockEvent>();
mockBlockEvent.getTransactionEvents.mockReturnValue([
mockTransactionEvent,
]);
});
it('clears saved details for valid transactions', async () => {
const blockListener = blockEventHandler(redis);
mockIsValidGetter.mockReturnValue(true);
mockTransactionIdGetter.mockReturnValue(mockTransactionId);
await blockListener(mockBlockEvent);
const index = await redis.zrange('index:txn:timestamp', 0, -1);
expect(index).toStrictEqual([]);
});
it('does not clear saved details for invalid transactions', async () => {
const blockListener = blockEventHandler(redis);
mockIsValidGetter.mockReturnValue(false);
await blockListener(mockBlockEvent);
const index = await redis.zrange('index:txn:timestamp', 0, -1);
expect(index).toStrictEqual([
'0ae62c01e4c4b112c3f3954a2f11243da76778e46df9ad2783bcbafc79652b95',
]);
});
});
describe('getBlockHeight', () => { describe('getBlockHeight', () => {
it('gets the current block height', async () => { it('gets the current block height', async () => {
const mockBlockchainInfoProto = const mockBlockchainInfoProto =

View file

@ -344,23 +344,35 @@ const isDuplicateTransactionError = (error: {
return false; return false;
}; };
/*
* Block event listener to handle successful transactions
*
* Transaction details are saved before being submitted so that
* they can be retried, and this listener deletes those transaction
* details for any successful transactions
*
* Transactions can be submitted using one of two identities
* however one one of those identities is used to listen for
* block events
*/
export const blockEventHandler = (redis: Redis): BlockListener => { export const blockEventHandler = (redis: Redis): BlockListener => {
const blockListner = async (event: BlockEvent) => { const blockListener = async (event: BlockEvent) => {
logger.debug('Block event received '); logger.debug(
const transEvents: Array<TransactionEvent> = event.getTransactionEvents(); { blockNumber: event.blockNumber.toString() },
'Block event received'
);
const transactionEvents: Array<TransactionEvent> =
event.getTransactionEvents();
for (const transEvent of transEvents) { for (const event of transactionEvents) {
if (transEvent && transEvent.isValid) { if (event && event.isValid) {
logger.debug( logger.debug('Remove transation with txnId %s', event.transactionId);
'Remove transation with txnId %s', await clearTransactionDetails(redis, event.transactionId);
transEvent.transactionId
);
await clearTransactionDetails(redis, transEvent.transactionId);
} }
} }
}; };
return blockListner; return blockListener;
}; };
export const getBlockHeight = async ( export const getBlockHeight = async (

View file

@ -2,22 +2,13 @@
* SPDX-License-Identifier: Apache-2.0 * SPDX-License-Identifier: Apache-2.0
*/ */
import { Network } from 'fabric-network';
import { Redis } from 'ioredis';
import * as config from './config'; import * as config from './config';
import { blockEventHandler } from './fabric';
import { logger } from './logger'; import { logger } from './logger';
import { createServer } from './server'; import { createServer } from './server';
async function main() { async function main() {
const app = await createServer(); const app = await createServer();
// TODO block listener currently only handles a single org!!!
// TODO should it be initialised here?
const redis = app.get('redis') as Redis;
const network = app.get('networkOrg1') as Network;
await network.addBlockListener(blockEventHandler(redis));
app.listen(config.port, () => { app.listen(config.port, () => {
logger.info('Express server started on port: %d', config.port); logger.info('Express server started on port: %d', config.port);
}); });

View file

@ -18,6 +18,7 @@ import {
createGateway, createGateway,
createWallet, createWallet,
startRetryLoop, startRetryLoop,
blockEventHandler,
} from './fabric'; } from './fabric';
import { redis } from './redis'; import { redis } from './redis';
import * as config from './config'; import * as config from './config';
@ -81,9 +82,6 @@ export const createServer = async (): Promise<Application> => {
const contractsOrg1 = await getContracts(networkOrg1); const contractsOrg1 = await getContracts(networkOrg1);
app.set(config.mspIdOrg1, contractsOrg1); app.set(config.mspIdOrg1, contractsOrg1);
// TODO used for block listener, which needs fixing!
app.set('networkOrg1', networkOrg1);
const gatewayOrg2 = await createGateway( const gatewayOrg2 = await createGateway(
config.connectionProfileOrg2, config.connectionProfileOrg2,
config.mspIdOrg2, config.mspIdOrg2,
@ -100,6 +98,13 @@ export const createServer = async (): Promise<Application> => {
app.set('redis', redis); app.set('redis', redis);
logger.debug('Adding block listener to %s network', config.blockListenerOrg);
if (config.blockListenerOrg === config.ORG1) {
await networkOrg1.addBlockListener(blockEventHandler(redis));
} else {
await networkOrg2.addBlockListener(blockEventHandler(redis));
}
app.use('/', healthRouter); app.use('/', healthRouter);
app.use('/api/assets', authenticateApiKey, assetsRouter); app.use('/api/assets', authenticateApiKey, assetsRouter);
app.use('/api/transactions', authenticateApiKey, transactionsRouter); app.use('/api/transactions', authenticateApiKey, transactionsRouter);