Update transaction retry to use correct user

Also improves test coverage

Signed-off-by: James Taylor <jamest@uk.ibm.com>
This commit is contained in:
James Taylor 2021-08-17 17:34:45 +01:00
parent 82b1249f4e
commit bf91df7ef3
12 changed files with 776 additions and 230 deletions

View file

@ -1,21 +0,0 @@
import { RedisOptions } from 'ioredis';
class IORedis {
redisOptions: RedisOptions;
constructor(options: RedisOptions) {
this.redisOptions = options;
}
hincrby = jest.fn().mockReturnThis();
multi = jest.fn().mockReturnThis();
del = jest.fn().mockReturnThis();
zrem = jest.fn().mockReturnThis();
exec = jest.fn().mockReturnThis();
hset = jest.fn().mockReturnThis();
zadd = jest.fn().mockReturnThis();
}
export default IORedis;

View file

@ -144,7 +144,7 @@ mockBasicContract.createTransaction
const mockGetTransactionByIDTransaction = mock<Transaction>(); const mockGetTransactionByIDTransaction = mock<Transaction>();
mockGetTransactionByIDTransaction.evaluate mockGetTransactionByIDTransaction.evaluate
.calledWith('mychannel', 'txn1') .calledWith('mychannel', 'txn2')
.mockResolvedValue(processedTransactionBuffer); .mockResolvedValue(processedTransactionBuffer);
mockGetTransactionByIDTransaction.evaluate mockGetTransactionByIDTransaction.evaluate
.calledWith('mychannel', 'txn3') .calledWith('mychannel', 'txn3')

View file

@ -592,7 +592,7 @@ describe('Asset Transfer Besic REST API', () => {
it('GET should respond with json details for the specified transaction ID', async () => { it('GET should respond with json details for the specified transaction ID', async () => {
const response = await request(app) const response = await request(app)
.get('/api/transactions/txn1') .get('/api/transactions/txn2')
.set('X-Api-Key', 'ORG1MOCKAPIKEY'); .set('X-Api-Key', 'ORG1MOCKAPIKEY');
expect(response.statusCode).toEqual(200); expect(response.statusCode).toEqual(200);
expect(response.header).toHaveProperty( expect(response.header).toHaveProperty(

View file

@ -84,6 +84,7 @@ assetsRouter.post(
const transactionId = await submitTransaction( const transactionId = await submitTransaction(
contract, contract,
redis, redis,
mspId,
'CreateAsset', 'CreateAsset',
assetId, assetId,
req.body.color, req.body.color,
@ -235,6 +236,7 @@ assetsRouter.put(
const transactionId = await submitTransaction( const transactionId = await submitTransaction(
contract, contract,
redis, redis,
mspId,
'UpdateAsset', 'UpdateAsset',
assetId, assetId,
req.body.color, req.body.color,
@ -306,6 +308,7 @@ assetsRouter.patch(
const transactionId = await submitTransaction( const transactionId = await submitTransaction(
contract, contract,
redis, redis,
mspId,
'TransferAsset', 'TransferAsset',
assetId, assetId,
newOwner newOwner
@ -351,6 +354,7 @@ assetsRouter.delete('/:assetId', async (req: Request, res: Response) => {
const transactionId = await submitTransaction( const transactionId = await submitTransaction(
contract, contract,
redis, redis,
mspId,
'DeleteAsset', 'DeleteAsset',
assetId assetId
); );

View file

@ -7,12 +7,20 @@ import {
createWallet, createWallet,
getContracts, getContracts,
getNetwork, getNetwork,
retryTransaction, evatuateTransaction,
submitTransaction,
getBlockHeight,
startRetryLoop,
} from './fabric'; } from './fabric';
import * as config from './config'; import * as config from './config';
import IORedis from './__mocks__/IORedis'; import {
import { Redis } from 'ioredis'; AssetExistsError,
AssetNotFoundError,
TransactionError,
TransactionNotFoundError,
} from './errors';
import { import {
Contract, Contract,
Gateway, Gateway,
@ -22,19 +30,14 @@ import {
Wallet, Wallet,
} from 'fabric-network'; } from 'fabric-network';
import { mock } from 'jest-mock-extended'; import * as fabricProtos from 'fabric-protos';
import { MockProxy, mock } from 'jest-mock-extended';
import IORedis, { Redis } from 'ioredis';
import Long from 'long';
jest.mock('./config'); jest.mock('./config');
jest.mock('ioredis'); jest.mock('ioredis', () => require('ioredis-mock/jest'));
const redisOptions = {
port: config.redisPort,
host: config.redisHost,
username: config.redisUsername,
password: config.redisPassword,
};
const redis = new IORedis(redisOptions) as unknown as Redis;
describe('Fabric', () => { describe('Fabric', () => {
describe('createWallet', () => { describe('createWallet', () => {
@ -101,66 +104,393 @@ describe('Fabric', () => {
}); });
}); });
describe('Testing retryTransaction', () => { describe('startRetryLoop', () => {
const transactionId = let redis: Redis;
let mockTransaction: MockProxy<Transaction>;
let mockContract: MockProxy<Contract>;
let mockContracts: Map<string, Contract>;
const mockTransactionId =
'0ae62c01e4c4b112c3f3954a2f11243da76778e46df9ad2783bcbafc79652b95'; '0ae62c01e4c4b112c3f3954a2f11243da76778e46df9ad2783bcbafc79652b95';
const state = `{"name":"CreateAsset","nonce":"damqinq8nrI4n4qY8lFVsZw7RwG2ufrv","transactionId":${transactionId}`; const mockKey = `txn:${mockTransactionId}`;
const args = '["test111","red",400,"Jean",101]'; const mockMspId = 'Org1MSP';
const timestamp = 1628078044362; const mockState = Buffer.from(
const savedTransaction = { `{"name":"CreateAsset","nonce":"damqinq8nrI4n4qY8lFVsZw7RwG2ufrv","transactionId":${mockTransactionId}`
timestamp: timestamp.toString(), );
state: state, const mockArgs = '["test111","red",400,"Jean",101]';
retries: '', const mockTimestamp = 1628078044362;
args: args,
const flushPromises = () => {
jest.useRealTimers();
return new Promise((resolve) => setImmediate(resolve));
}; };
describe('Check retry increment ', () => { const addMockTransationDetails = async (redis: Redis) => {
const transactionId = await redis
'0ae62c01e4c4b112c3f3954a2f11243da76778e46df9ad2783bcbafc79652b95'; .multi()
const state = `{"name":"CreateAsset","nonce":"damqinq8nrI4n4qY8lFVsZw7RwG2ufrv","transactionId":${transactionId}`; .hset(
const args = '["test111","red",400,"Jean",101]'; mockKey,
const timestamp = 1628078044362; 'mspId',
const savedTransaction = { mockMspId,
timestamp: timestamp.toString(), 'state',
state: state, mockState,
retries: '', 'args',
args: args, mockArgs,
'timestamp',
mockTimestamp,
'retries',
'0'
)
.zadd('index:txn:timestamp', mockTimestamp, mockTransactionId)
.exec();
};
beforeEach(() => {
const redisOptions = {
port: config.redisPort,
host: config.redisHost,
username: config.redisUsername,
password: config.redisPassword,
}; };
it('Transaction failure, check redis increment func call', async () => { redis = new IORedis(redisOptions) as unknown as Redis;
const mockTransaction = mock<Transaction>();
mockTransaction.submit.mockRejectedValue('MOCKERROR');
const mockContract = mock<Contract>();
mockContract.deserializeTransaction.mockReturnValue(mockTransaction);
savedTransaction.retries = '3'; mockTransaction = mock<Transaction>();
await retryTransaction( mockTransaction.submit
mockContract, .mockResolvedValue(Buffer.from('MOCK PAYLOAD'))
redis, .mockName('submit');
transactionId, mockContract = mock<Contract>();
savedTransaction mockContract.deserializeTransaction.mockReturnValue(mockTransaction);
); mockContracts = new Map<string, Contract>();
expect(redis.hincrby).toHaveBeenCalledTimes(1); mockContracts.set(mockMspId, mockContract);
});
jest.useFakeTimers();
}); });
describe('Transaction successful, check redis delete key func call ', () => { afterEach(() => {
it('call redis increment', async () => { jest.useRealTimers();
const mockTransaction = mock<Transaction>(); });
mockTransaction.submit.mockResolvedValue(Buffer.from('{}'));
const mockContract = mock<Contract>();
mockContract.deserializeTransaction.mockReturnValue(mockTransaction);
savedTransaction.retries = '3'; it('starts a retry loop which does nothing if there are no saved transaction details', async () => {
await retryTransaction( const getContractSpy = jest.spyOn(mockContracts, 'get');
startRetryLoop(mockContracts, redis);
jest.runOnlyPendingTimers();
await flushPromises();
expect(getContractSpy).not.toBeCalled();
});
it('starts a retry loop which clears the saved details after succesfully retrying a transaction', async () => {
addMockTransationDetails(redis);
startRetryLoop(mockContracts, redis);
jest.runOnlyPendingTimers();
await flushPromises();
expect(mockContract.deserializeTransaction).toBeCalledWith(mockState);
expect(mockTransaction.submit).toBeCalledWith(
'test111',
'red',
400,
'Jean',
101
);
const index = await redis.zrange('index:txn:timestamp', 0, -1);
expect(index).toStrictEqual([]);
});
it('starts a retry loop which increments the retry count when a transaction fails', async () => {
addMockTransationDetails(redis);
mockTransaction.submit.mockRejectedValue(new Error('MOCK ERROR'));
startRetryLoop(mockContracts, redis);
jest.runOnlyPendingTimers();
await flushPromises();
expect(mockContract.deserializeTransaction).toBeCalledWith(mockState);
expect(mockTransaction.submit).toBeCalledWith(
'test111',
'red',
400,
'Jean',
101
);
const index = await redis.zrange('index:txn:timestamp', 0, -1);
expect(index).toStrictEqual([
'0ae62c01e4c4b112c3f3954a2f11243da76778e46df9ad2783bcbafc79652b95',
]);
const savedTransaction = await (redis as Redis).hgetall(mockKey);
expect(savedTransaction.retries).toBe('1');
});
it('starts a retry loop which clears the saved details when a transaction fails as a duplicate', async () => {
addMockTransationDetails(redis);
const mockDuplicateTransactionError = new Error('MOCK ERROR');
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(mockDuplicateTransactionError as any).errors = [
{
endorsements: [
{
details: 'duplicate transaction found',
},
],
},
];
mockTransaction.submit.mockRejectedValue(mockDuplicateTransactionError);
startRetryLoop(mockContracts, redis);
jest.runOnlyPendingTimers();
await flushPromises();
expect(mockContract.deserializeTransaction).toBeCalledWith(mockState);
expect(mockTransaction.submit).toBeCalledWith(
'test111',
'red',
400,
'Jean',
101
);
const index = await redis.zrange('index:txn:timestamp', 0, -1);
expect(index).toStrictEqual([]);
});
it('starts a retry loop which clears the saved details when a transaction fails the final attempt', async () => {
addMockTransationDetails(redis);
await (redis as Redis).hincrby(mockKey, 'retries', 5);
mockTransaction.submit.mockRejectedValue(new Error('MOCK ERROR'));
startRetryLoop(mockContracts, redis);
jest.runOnlyPendingTimers();
await flushPromises();
expect(mockContract.deserializeTransaction).toBeCalledWith(mockState);
expect(mockTransaction.submit).toBeCalledWith(
'test111',
'red',
400,
'Jean',
101
);
const index = await redis.zrange('index:txn:timestamp', 0, -1);
expect(index).toStrictEqual([]);
});
});
describe('evatuateTransaction', () => {
const mockPayload = Buffer.from('MOCK PAYLOAD');
let mockTransaction: MockProxy<Transaction>;
let mockContract: MockProxy<Contract>;
beforeEach(() => {
mockTransaction = mock<Transaction>();
mockTransaction.evaluate.mockResolvedValue(mockPayload);
mockContract = mock<Contract>();
mockContract.createTransaction
.calledWith('txn')
.mockReturnValue(mockTransaction);
});
it('gets the result of evaluating a transaction', async () => {
const result = await evatuateTransaction(
mockContract,
'txn',
'arga',
'argb'
);
expect(result.toString()).toBe(mockPayload.toString());
});
it.each([
'the asset GOCHAINCODE already exists',
'Asset JAVACHAINCODE already exists',
'The asset JSCHAINCODE already exists',
])(
'throws an AssetExistsError an asset already exists error occurs: %s',
async (msg) => {
mockTransaction.evaluate.mockRejectedValue(new Error(msg));
await expect(async () => {
await evatuateTransaction(mockContract, 'txn', 'arga', 'argb');
}).rejects.toThrow(AssetExistsError);
}
);
it.each([
'the asset GOCHAINCODE does not exist',
'Asset JAVACHAINCODE does not exist',
'The asset JSCHAINCODE does not exist',
])(
'throws an AssetNotFoundError if an asset does not exist error occurs: %s',
async (msg) => {
mockTransaction.evaluate.mockRejectedValue(new Error(msg));
await expect(async () => {
await evatuateTransaction(mockContract, 'txn', 'arga', 'argb');
}).rejects.toThrow(AssetNotFoundError);
}
);
it('throws a TransactionNotFoundError if a transaction not found error occurs', async () => {
mockTransaction.evaluate.mockRejectedValue(
new Error(
'Failed to get transaction with id txn, error Entry not found in index'
)
);
await expect(async () => {
await evatuateTransaction(mockContract, 'txn', 'arga', 'argb');
}).rejects.toThrow(TransactionNotFoundError);
});
it('throws a TransactionError for other errors', async () => {
mockTransaction.evaluate.mockRejectedValue(new Error('MOCK ERROR'));
await expect(async () => {
await evatuateTransaction(mockContract, 'txn', 'arga', 'argb');
}).rejects.toThrow(TransactionError);
});
});
describe('submitTransaction', () => {
let redis: Redis;
const mockPayload = Buffer.from('MOCK PAYLOAD');
let mockTransaction: MockProxy<Transaction>;
let mockContract: MockProxy<Contract>;
beforeEach(async () => {
const redisOptions = {
port: config.redisPort,
host: config.redisHost,
username: config.redisUsername,
password: config.redisPassword,
};
redis = new IORedis(redisOptions) as unknown as Redis;
mockTransaction = mock<Transaction>();
mockTransaction.submit.mockResolvedValue(mockPayload);
mockTransaction.getTransactionId.mockReturnValue('MOCK TXN ID');
mockTransaction.serialize.mockReturnValue(Buffer.from('MOCK TXN STATE'));
mockContract = mock<Contract>();
mockContract.createTransaction
.calledWith('txn')
.mockReturnValue(mockTransaction);
});
it('gets the transaction ID of the submitted transaction', async () => {
const result = await submitTransaction(
mockContract,
redis,
'mspid',
'txn',
'arga',
'argb'
);
expect(result).toBe('MOCK TXN ID');
});
it.each([
'the asset GOCHAINCODE already exists',
'Asset JAVACHAINCODE already exists',
'The asset JSCHAINCODE already exists',
])(
'throws an AssetExistsError an asset already exists error occurs: %s',
async (msg) => {
mockTransaction.submit.mockRejectedValue(new Error(msg));
await expect(async () => {
await submitTransaction(
mockContract,
redis,
'mspid',
'txn',
'arga',
'argb'
);
}).rejects.toThrow(AssetExistsError);
}
);
it.each([
'the asset GOCHAINCODE does not exist',
'Asset JAVACHAINCODE does not exist',
'The asset JSCHAINCODE does not exist',
])(
'throws an AssetNotFoundError if an asset does not exist error occurs: %s',
async (msg) => {
mockTransaction.submit.mockRejectedValue(new Error(msg));
await expect(async () => {
await submitTransaction(
mockContract,
redis,
'mspid',
'txn',
'arga',
'argb'
);
}).rejects.toThrow(AssetNotFoundError);
}
);
it('throws a TransactionNotFoundError if a transaction not found error occurs', async () => {
mockTransaction.submit.mockRejectedValue(
new Error(
'Failed to get transaction with id txn, error Entry not found in index'
)
);
await expect(async () => {
await submitTransaction(
mockContract, mockContract,
redis, redis,
transactionId, 'mspid',
savedTransaction 'txn',
'arga',
'argb'
); );
}).rejects.toThrow(TransactionNotFoundError);
});
expect(redis.del).toHaveBeenCalledTimes(1); it('throws a TransactionError for other errors', async () => {
}); mockTransaction.submit.mockRejectedValue(new Error('MOCK ERROR'));
await expect(async () => {
await submitTransaction(
mockContract,
redis,
'mspid',
'txn',
'arga',
'argb'
);
}).rejects.toThrow(TransactionError);
});
});
describe('getBlockHeight', () => {
it('gets the current block height', async () => {
const mockBlockchainInfoProto =
fabricProtos.common.BlockchainInfo.create();
mockBlockchainInfoProto.height = 42;
const mockBlockchainInfoBuffer = Buffer.from(
fabricProtos.common.BlockchainInfo.encode(
mockBlockchainInfoProto
).finish()
);
const mockContract = mock<Contract>();
mockContract.evaluateTransaction
.calledWith('GetChainInfo', 'mychannel')
.mockResolvedValue(mockBlockchainInfoBuffer);
const result = (await getBlockHeight(mockContract)) as Long;
expect(result.toInt()).toStrictEqual(42);
}); });
}); });
}); });

View file

@ -20,8 +20,10 @@ import * as config from './config';
import { logger } from './logger'; import { logger } from './logger';
import { import {
storeTransactionDetails, storeTransactionDetails,
getRetryTransactionDetails,
clearTransactionDetails, clearTransactionDetails,
incrementRetryCount, incrementRetryCount,
TransactionDetails,
} from './redis'; } from './redis';
import { import {
AssetExistsError, AssetExistsError,
@ -115,52 +117,48 @@ export const getContracts = async (
return { assetContract, qsccContract }; return { assetContract, qsccContract };
}; };
export const startRetryLoop = (contract: Contract, redis: Redis): void => { export const startRetryLoop = (
setInterval( contracts: Map<string, Contract>,
async (redis) => { redis: Redis
try { ): void => {
const pendingTransactionCount = await (redis as Redis).zcard( const retryInterval = setInterval(
'index:txn:timestamp' async (contracts, redis) => {
); if (logger.isLevelEnabled('debug')) {
logger.debug( try {
'Transactions awaiting retry: %d', const pendingTransactionCount = await (redis as Redis).zcard(
pendingTransactionCount 'index:txn:timestamp'
); );
logger.debug(
// TODO pick a random transaction instead to reduce chances of '%d transactions awaiting retry',
// clashing with other instances? Currently no zrandmember pendingTransactionCount
// command though... );
// https://github.com/luin/ioredis/issues/1374 } catch (err) {
const transactionIds = await (redis as Redis).zrange( logger.warn({ err }, 'Error getting pending transaction count');
'index:txn:timestamp', }
-1, }
-1
); const savedTransaction = await getRetryTransactionDetails(redis);
if (transactionIds.length > 0) { if (savedTransaction) {
const transactionId = transactionIds[0]; const contract = contracts.get(savedTransaction.mspId);
const savedTransaction = await (redis as Redis).hgetall(
`txn:${transactionId}` if (contract) {
await retryTransaction(contract, redis, savedTransaction);
} else {
logger.error(
'No contract found for %s to retry transaction %s',
savedTransaction.mspId,
savedTransaction.transactionId
); );
if (parseInt(savedTransaction.retries) >= config.maxRetryCount) {
await clearTransactionDetails(redis, transactionId);
} else {
await retryTransaction(
contract,
redis,
transactionId,
savedTransaction
);
}
} }
} catch (err) {
// TODO just log?
logger.error(err, 'error getting saved transaction state');
} }
}, },
config.retryDelay, config.retryDelay,
contracts,
redis redis
); );
retryInterval.unref();
}; };
export const evatuateTransaction = async ( export const evatuateTransaction = async (
@ -173,7 +171,10 @@ export const evatuateTransaction = async (
try { try {
const payload = await txn.evaluate(...transactionArgs); const payload = await txn.evaluate(...transactionArgs);
logger.debug({ payload }, 'Evaluate transaction response received'); logger.debug(
{ transactionId: txnId, payload: payload.toString() },
'Evaluate transaction response received'
);
return payload; return payload;
} catch (err) { } catch (err) {
throw handleError(txnId, err); throw handleError(txnId, err);
@ -183,6 +184,7 @@ export const evatuateTransaction = async (
export const submitTransaction = async ( export const submitTransaction = async (
contract: Contract, contract: Contract,
redis: Redis, redis: Redis,
mspId: string,
transactionName: string, transactionName: string,
...transactionArgs: string[] ...transactionArgs: string[]
): Promise<string> => { ): Promise<string> => {
@ -195,7 +197,14 @@ export const submitTransaction = async (
try { try {
// Store the transaction details and set the event handler in case there // Store the transaction details and set the event handler in case there
// are problems later with commiting the transaction // are problems later with commiting the transaction
await storeTransactionDetails(redis, txnId, txnState, txnArgs, timestamp); await storeTransactionDetails(
redis,
txnId,
mspId,
txnState,
txnArgs,
timestamp
);
txn.setEventHandler(DefaultEventHandlerStrategies.NONE); txn.setEventHandler(DefaultEventHandlerStrategies.NONE);
await txn.submit(...transactionArgs); await txn.submit(...transactionArgs);
} catch (err) { } catch (err) {
@ -212,6 +221,7 @@ export const submitTransaction = async (
// Unfortunately the chaincode samples do not use error codes, and the error // Unfortunately the chaincode samples do not use error codes, and the error
// message text is not the same for each implementation // message text is not the same for each implementation
// TODO move to errors.ts?
const handleError = (transactionId: string, err: Error): Error => { const handleError = (transactionId: string, err: Error): Error => {
// This regex needs to match the following error messages: // This regex needs to match the following error messages:
// "the asset %s already exists" // "the asset %s already exists"
@ -266,40 +276,55 @@ const handleError = (transactionId: string, err: Error): Error => {
return new TransactionError('Transaction error', transactionId); return new TransactionError('Transaction error', transactionId);
}; };
export const retryTransaction = async ( const retryTransaction = async (
contract: Contract, contract: Contract,
redis: Redis, redis: Redis,
transactionId: string, savedTransaction: TransactionDetails
savedTransaction: Record<string, string>
): Promise<void> => { ): Promise<void> => {
logger.debug('Retrying transaction %s', transactionId); logger.debug('Retrying transaction %s', savedTransaction.transactionId);
try { try {
const transaction = contract.deserializeTransaction( const transaction = contract.deserializeTransaction(
Buffer.from(savedTransaction.state) savedTransaction.transactionState
); );
const args: string[] = JSON.parse(savedTransaction.args); const args: string[] = JSON.parse(savedTransaction.transactionArgs);
await transaction.submit(...args); const payload = await transaction.submit(...args);
await clearTransactionDetails(redis, transactionId); logger.debug(
{
transactionId: savedTransaction.transactionId,
payload: payload.toString(),
},
'Retry transaction response received'
);
await clearTransactionDetails(redis, savedTransaction.transactionId);
} catch (err) { } catch (err) {
if (isDuplicateTransaction(err)) { if (isDuplicateTransactionError(err)) {
logger.warn('Transaction %s has already been committed', transactionId); logger.warn(
await clearTransactionDetails(redis, transactionId); 'Transaction %s has already been committed',
savedTransaction.transactionId
);
await clearTransactionDetails(redis, savedTransaction.transactionId);
} else { } else {
// TODO check for retry limit and update timestamp
logger.warn( logger.warn(
err, err,
'Retry %d failed for transaction %s', 'Retry %d failed for transaction %s',
savedTransaction.retries, savedTransaction.retries,
transactionId savedTransaction.transactionId
); );
await incrementRetryCount(redis, transactionId);
if (savedTransaction.retries < config.maxRetryCount) {
await incrementRetryCount(redis, savedTransaction.transactionId);
} else {
await clearTransactionDetails(redis, savedTransaction.transactionId);
}
} }
} }
}; };
const isDuplicateTransaction = (error: { // TODO move to errors.ts?
const isDuplicateTransactionError = (error: {
errors: { endorsements: { details: string }[] }[]; errors: { endorsements: { details: string }[] }[];
}) => { }) => {
// TODO this is horrible! Isn't it possible to check for TxValidationCode DUPLICATE_TXID somehow? // TODO this is horrible! Isn't it possible to check for TxValidationCode DUPLICATE_TXID somehow?

View file

@ -2,24 +2,21 @@
* SPDX-License-Identifier: Apache-2.0 * SPDX-License-Identifier: Apache-2.0
*/ */
import { Contract, Network } from 'fabric-network'; import { Network } from 'fabric-network';
import { Redis } from 'ioredis'; import { Redis } from 'ioredis';
import * as config from './config'; import * as config from './config';
import { startRetryLoop, blockEventHandler } from './fabric'; import { blockEventHandler } from './fabric';
import { logger } from './logger'; import { logger } from './logger';
import { createServer } from './server'; import { createServer } from './server';
async function main() { async function main() {
const app = await createServer(); const app = await createServer();
// TODO block listener and retry logic currently only handles a single org!!! // TODO block listener currently only handles a single org!!!
// TODO should these be initialised here? // TODO should it be initialised here?
const contract = app.get(config.mspIdOrg1).assetContract as Contract;
const redis = app.get('redis') as Redis; const redis = app.get('redis') as Redis;
const network = app.get('networkOrg1') as Network; const network = app.get('networkOrg1') as Network;
await network.addBlockListener(blockEventHandler(redis)); await network.addBlockListener(blockEventHandler(redis));
startRetryLoop(contract, redis);
app.listen(config.port, () => { app.listen(config.port, () => {
logger.info('Express server started on port: %d', config.port); logger.info('Express server started on port: %d', config.port);

View file

@ -1,3 +1,7 @@
/*
* SPDX-License-Identifier: Apache-2.0
*/
import pino from 'pino'; import pino from 'pino';
import * as config from './config'; import * as config from './config';

View file

@ -1,75 +1,184 @@
import IORedis from './__mocks__/IORedis'; /*
* SPDX-License-Identifier: Apache-2.0
*/
import * as config from './config'; import * as config from './config';
import { Redis } from 'ioredis'; import IORedis, { Redis } from 'ioredis';
import { import {
clearTransactionDetails, clearTransactionDetails,
incrementRetryCount, incrementRetryCount,
storeTransactionDetails, storeTransactionDetails,
getTransactionDetails,
getRetryTransactionDetails,
} from './redis'; } from './redis';
jest.mock('ioredis'); jest.mock('ioredis', () => require('ioredis-mock/jest'));
jest.mock('./config'); jest.mock('./config');
const redisOptions = { describe('Redis', () => {
port: config.redisPort, let redis: Redis;
host: config.redisHost,
username: config.redisUsername, const mockTransactionId =
password: config.redisPassword,
};
const redis = new IORedis(redisOptions) as unknown as Redis;
describe('Testing increment retries ', () => {
const transactionId =
'0ae62c01e4c4b112c3f3954a2f11243da76778e46df9ad2783bcbafc79652b95'; '0ae62c01e4c4b112c3f3954a2f11243da76778e46df9ad2783bcbafc79652b95';
it('Should increment retries for valid transction id', async () => { const mockKey = `txn:${mockTransactionId}`;
await incrementRetryCount(redis, transactionId); const mockMspId = 'Org1MSP';
expect(redis.hincrby).toHaveBeenCalledTimes(1); const mockState = Buffer.from(
`{"name":"CreateAsset","nonce":"damqinq8nrI4n4qY8lFVsZw7RwG2ufrv","transactionId":${mockTransactionId}`
);
const mockArgs = '["test111","red",400,"Jean",101]';
const mockTimestamp = 1628078044362;
const addMockTransationDetails = async (redis: Redis) => {
await redis
.multi()
.hset(
mockKey,
'mspId',
mockMspId,
'state',
mockState,
'args',
mockArgs,
'timestamp',
mockTimestamp,
'retries',
'0'
)
.zadd('index:txn:timestamp', mockTimestamp, mockTransactionId)
.exec();
};
beforeEach(async () => {
const redisOptions = {
port: config.redisPort,
host: config.redisHost,
username: config.redisUsername,
password: config.redisPassword,
};
redis = new IORedis(redisOptions) as unknown as Redis;
});
describe('storeTransactionDetails', () => {
it('stores transaction details as a hash', async () => {
await storeTransactionDetails(
redis,
mockTransactionId,
mockMspId,
mockState,
mockArgs,
mockTimestamp
);
const storedTransaction = await redis.hgetall(mockKey);
const expectedTransaction = {
mspId: mockMspId,
state: mockState,
args: mockArgs,
retries: '0',
timestamp: mockTimestamp.toString(),
};
expect(storedTransaction).toStrictEqual(expectedTransaction);
});
it('adds the transaction ID to the sorted set timestamp index', async () => {
await storeTransactionDetails(
redis,
mockTransactionId,
mockMspId,
mockState,
mockArgs,
mockTimestamp
);
const index = await redis.zrange('index:txn:timestamp', 0, -1);
expect(index).toStrictEqual([mockTransactionId]);
});
// TODO this seems to work for spying/mocking...
// jest.spyOn(redis, 'multi').mock...
// but haven't worked out how to spy on the hset, zadd, exec in that chain
// Ask Mark?
it.todo('handles an error from redis');
}); });
it('Should not increment retries for empty transaction id ', async () => { describe('getTransactionDetails', () => {
await incrementRetryCount(redis, ''); it('gets the transaction details from a hash', async () => {
expect(redis.hincrby).toHaveBeenCalledTimes(0); await addMockTransationDetails(redis);
});
});
describe('Testing storeTransactionDetails ', () => { const details = await getTransactionDetails(redis, mockTransactionId);
const args = '["test111","red",400,"Jean",101]';
const timestamp = 1628078044362; expect(details).toStrictEqual({
it('Should store details for valid transction Id', async () => { transactionId: mockTransactionId,
const transactionId = mspId: mockMspId,
'0ae62c01e4c4b112c3f3954a2f11243da76778e46df9ad2783bcbafc79652b95'; transactionState: mockState,
const state = `{"name":"CreateAsset","nonce":"damqinq8nrI4n4qY8lFVsZw7RwG2ufrv","transactionId":${transactionId}`; transactionArgs: mockArgs,
await storeTransactionDetails( retries: 0,
redis, timestamp: mockTimestamp,
transactionId, });
Buffer.from(state), });
args,
timestamp it.todo('handles an error from redis');
});
describe('getRetryTransactionDetails', () => {
it('gets the oldest transaction details from a hash', async () => {
await addMockTransationDetails(redis);
const details = await getRetryTransactionDetails(redis);
expect(details).toStrictEqual({
transactionId: mockTransactionId,
mspId: mockMspId,
transactionState: mockState,
transactionArgs: mockArgs,
retries: 0,
timestamp: mockTimestamp,
});
});
it('gets undefined if there are no transactions to retry', async () => {
const details = await getRetryTransactionDetails(redis);
expect(details).toBeUndefined();
});
it.todo('handles an error from redis');
});
describe('clearTransactionDetails', () => {
it('removes the transaction details hash', async () => {
await addMockTransationDetails(redis);
await clearTransactionDetails(redis, mockTransactionId);
const storedTransaction = await redis.hgetall(mockKey);
expect(storedTransaction).not.toHaveProperty('state');
});
it('removes the transaction ID from the sorted set timestamp index', async () => {
await addMockTransationDetails(redis);
await clearTransactionDetails(redis, mockTransactionId);
const index = await redis.zrange('index:txn:timestamp', 0, -1);
expect(index).toStrictEqual([]);
});
});
describe('incrementRetryCount', () => {
it('increments the retries value in the transction details hash', async () => {
await addMockTransationDetails(redis);
await incrementRetryCount(redis, mockTransactionId);
const retries = await redis.hget(mockKey, 'retries');
expect(retries).toBe('1');
});
it.todo(
'updates the position of the transaction ID in the sorted set timestamp index'
); );
expect(redis.hset).toHaveBeenCalledTimes(1);
expect(redis.zadd).toHaveBeenCalledTimes(1);
});
it('Should not store details for empty transction Id', async () => { it.todo('handles an error from redis');
const transactionId = '';
const state = `{"name":"CreateAsset","nonce":"damqinq8nrI4n4qY8lFVsZw7RwG2ufrv","transactionId":${transactionId}`;
await storeTransactionDetails(
redis,
transactionId,
Buffer.from(state),
args,
timestamp
);
expect(redis.hset).toHaveBeenCalledTimes(0);
expect(redis.zadd).toHaveBeenCalledTimes(0);
});
});
describe('Testing clearTransactionDetails ', () => {
it('Should clear details ', async () => {
const transactionId =
'0ae62c01e4c4b112c3f3954a2f11243da76778e46df9ad2783bcbafc79652b95';
await clearTransactionDetails(redis, transactionId);
expect(redis.del).toHaveBeenCalledTimes(1);
expect(redis.zrem).toHaveBeenCalledTimes(1);
}); });
}); });

View file

@ -1,5 +1,12 @@
/* /*
* SPDX-License-Identifier: Apache-2.0 * SPDX-License-Identifier: Apache-2.0
*
* This sample includes basic retry logic so it needs somewhere to store
* transaction details in case the app restarts for any reason, and Redis is
* just one of the options available
*
* Note: This implementation is not designed with multiple instances of the
* REST app in mind, which is likely to be required in a production environment
*/ */
import IORedis, { Redis, RedisOptions } from 'ioredis'; import IORedis, { Redis, RedisOptions } from 'ioredis';
@ -16,29 +23,44 @@ const redisOptions: RedisOptions = {
export const redis = new IORedis(redisOptions); export const redis = new IORedis(redisOptions);
export type TransactionDetails = {
transactionId: string;
mspId: string;
transactionState: Buffer;
transactionArgs: string;
timestamp: number;
retries: number;
};
/*
* Store enough information in order to resubmit a transaction
*/
export const storeTransactionDetails = async ( export const storeTransactionDetails = async (
redis: Redis, redis: Redis,
transactionId: string, transactionId: string,
mspId: string,
transactionState: Buffer, transactionState: Buffer,
transactionArgs: string, transactionArgs: string,
timestamp: number timestamp: number
): Promise<void> => { ): Promise<void> => {
try { try {
if (transactionId.length === 0) {
throw new Error('Empty transaction Id found');
}
const key = `txn:${transactionId}`; const key = `txn:${transactionId}`;
logger.debug( logger.debug(
'Storing transaction details. Key: %s State: %s Args: %s Timestamp: %d', {
key, key,
transactionState, mspId,
transactionArgs, transactionState,
timestamp transactionArgs,
timestamp,
},
'Storing transaction details'
); );
await redis await redis
.multi() .multi()
.hset( .hset(
key, key,
'mspId',
mspId,
'state', 'state',
transactionState, transactionState,
'args', 'args',
@ -51,14 +73,84 @@ export const storeTransactionDetails = async (
.zadd('index:txn:timestamp', timestamp, transactionId) .zadd('index:txn:timestamp', timestamp, transactionId)
.exec(); .exec();
} catch (err) { } catch (err) {
// TODO just log?!
logger.error( logger.error(
err, { err },
'Error storing transaction details. ID %s', 'Error storing details for transaction ID %s',
transactionId transactionId
); );
} }
}; };
/*
* Get the information required to resubmit a transaction
*/
export const getTransactionDetails = async (
redis: Redis,
transactionId: string
): Promise<TransactionDetails | undefined> => {
try {
const savedTransaction = await (redis as Redis).hgetall(
`txn:${transactionId}`
);
logger.debug(
{ transactionId: transactionId, state: savedTransaction },
'Got transaction details'
);
const transactionDetails = {
transactionId: transactionId,
mspId: savedTransaction.mspId,
transactionState: Buffer.from(savedTransaction.state),
transactionArgs: savedTransaction.args,
timestamp: parseInt(savedTransaction.timestamp),
retries: parseInt(savedTransaction.retries),
};
return transactionDetails;
} catch (err) {
// TODO just log?!
logger.error(
{ err },
'Error getting details for transaction ID %s',
transactionId
);
}
};
/*
* Get the oldest transaction details
*/
export const getRetryTransactionDetails = async (
redis: Redis
): Promise<TransactionDetails | undefined> => {
try {
const transactionIds = await (redis as Redis).zrange(
'index:txn:timestamp',
-1,
-1
);
if (transactionIds.length > 0) {
const transactionId = transactionIds[0];
const savedTransaction = await getTransactionDetails(
redis,
transactionId
);
return savedTransaction;
}
} catch (err) {
// TODO just log?!
logger.error(
{ err },
'Error getting details for next transaction to retry'
);
}
};
/*
* Delete transaction details
*/
export const clearTransactionDetails = async ( export const clearTransactionDetails = async (
redis: Redis, redis: Redis,
transactionId: string transactionId: string
@ -72,16 +164,20 @@ export const clearTransactionDetails = async (
.zrem('index:txn:timestamp', transactionId) .zrem('index:txn:timestamp', transactionId)
.exec(); .exec();
} catch (err) { } catch (err) {
// TODO just log?!
logger.error( logger.error(
err, { err },
'Error remove saved transaction state for transaction ID %s', 'Error remove details for transaction ID %s',
transactionId transactionId
); );
} }
}; };
// TODO add getTransaction etc. helpers? /*
* Increment the number of times the transaction has been retried
* TODO needs to update the timestamp and index as well
*/
export const incrementRetryCount = async ( export const incrementRetryCount = async (
redis: Redis, redis: Redis,
transactionId: string transactionId: string
@ -89,11 +185,9 @@ export const incrementRetryCount = async (
const key = `txn:${transactionId}`; const key = `txn:${transactionId}`;
logger.debug('Incrementing retries fortransaction Key: %s', key); logger.debug('Incrementing retries fortransaction Key: %s', key);
try { try {
if (transactionId.length === 0) {
throw new Error('Empty transaction Id found');
}
await (redis as Redis).hincrby(`txn:${transactionId}`, 'retries', 1); await (redis as Redis).hincrby(`txn:${transactionId}`, 'retries', 1);
} catch (err) { } catch (err) {
// TODO just log?!
logger.error( logger.error(
err, err,
'Error incrementing retries for transaction ID %s', 'Error incrementing retries for transaction ID %s',

View file

@ -6,6 +6,7 @@ import helmet from 'helmet';
import { StatusCodes, getReasonPhrase } from 'http-status-codes'; import { StatusCodes, getReasonPhrase } from 'http-status-codes';
import express, { Application, NextFunction, Request, Response } from 'express'; import express, { Application, NextFunction, Request, Response } from 'express';
import pinoMiddleware from 'pino-http'; import pinoMiddleware from 'pino-http';
import { Contract } from 'fabric-network';
import { logger } from './logger'; import { logger } from './logger';
import { assetsRouter } from './assets.router'; import { assetsRouter } from './assets.router';
@ -16,6 +17,7 @@ import {
getNetwork, getNetwork,
createGateway, createGateway,
createWallet, createWallet,
startRetryLoop,
} from './fabric'; } from './fabric';
import { redis } from './redis'; import { redis } from './redis';
import * as config from './config'; import * as config from './config';
@ -91,6 +93,11 @@ export const createServer = async (): Promise<Application> => {
const contractsOrg2 = await getContracts(networkOrg2); const contractsOrg2 = await getContracts(networkOrg2);
app.set(config.mspIdOrg2, contractsOrg2); app.set(config.mspIdOrg2, contractsOrg2);
const assetContracts = new Map<string, Contract>();
assetContracts.set(config.mspIdOrg1, contractsOrg1.assetContract);
assetContracts.set(config.mspIdOrg2, contractsOrg2.assetContract);
startRetryLoop(assetContracts, redis);
app.set('redis', redis); app.set('redis', redis);
app.use('/', healthRouter); app.use('/', healthRouter);

View file

@ -7,6 +7,7 @@ import { Contract } from 'fabric-network';
import { protos } from 'fabric-protos'; import { protos } from 'fabric-protos';
import { getReasonPhrase, StatusCodes } from 'http-status-codes'; import { getReasonPhrase, StatusCodes } from 'http-status-codes';
import { Redis } from 'ioredis'; import { Redis } from 'ioredis';
import { getTransactionDetails } from './redis';
import { evatuateTransaction } from './fabric'; import { evatuateTransaction } from './fabric';
import { logger } from './logger'; import { logger } from './logger';
import * as config from './config'; import * as config from './config';
@ -33,18 +34,14 @@ transactionsRouter.get(
const redis = req.app.get('redis') as Redis; const redis = req.app.get('redis') as Redis;
try { try {
const savedTransaction = await (redis as Redis).hgetall( const savedTransaction = await getTransactionDetails(
`txn:${transactionId}` redis,
); transactionId
logger.debug(
{ transactionId: transactionId, state: savedTransaction },
'Saved transaction state'
); );
if (savedTransaction.state) { if (savedTransaction?.transactionState) {
foundTransaction = true; foundTransaction = true;
const retries = parseInt(savedTransaction.retries); if (savedTransaction.retries > 0) {
if (retries > 0) {
progress = 'RETRYING'; progress = 'RETRYING';
} else { } else {
progress = 'ACCEPTED'; progress = 'ACCEPTED';