From bf91df7ef3dc4ee9cf01070fa097fac0df08702e Mon Sep 17 00:00:00 2001 From: James Taylor Date: Tue, 17 Aug 2021 17:34:45 +0100 Subject: [PATCH] Update transaction retry to use correct user Also improves test coverage Signed-off-by: James Taylor --- .../src/__mocks__/IORedis.ts | 21 - .../src/__mocks__/fabric-network.ts | 2 +- .../src/__tests__/api.test.ts | 2 +- .../rest-api-typescript/src/assets.router.ts | 4 + .../rest-api-typescript/src/fabric.spec.ts | 452 +++++++++++++++--- .../rest-api-typescript/src/fabric.ts | 137 +++--- .../rest-api-typescript/src/index.ts | 11 +- .../rest-api-typescript/src/logger.ts | 4 + .../rest-api-typescript/src/redis.spec.ts | 225 ++++++--- .../rest-api-typescript/src/redis.ts | 126 ++++- .../rest-api-typescript/src/server.ts | 7 + .../src/transactions.router.ts | 15 +- 12 files changed, 776 insertions(+), 230 deletions(-) delete mode 100644 asset-transfer-basic/rest-api-typescript/src/__mocks__/IORedis.ts diff --git a/asset-transfer-basic/rest-api-typescript/src/__mocks__/IORedis.ts b/asset-transfer-basic/rest-api-typescript/src/__mocks__/IORedis.ts deleted file mode 100644 index bc31167e..00000000 --- a/asset-transfer-basic/rest-api-typescript/src/__mocks__/IORedis.ts +++ /dev/null @@ -1,21 +0,0 @@ -import { RedisOptions } from 'ioredis'; - -class IORedis { - redisOptions: RedisOptions; - constructor(options: RedisOptions) { - this.redisOptions = options; - } - - hincrby = jest.fn().mockReturnThis(); - multi = jest.fn().mockReturnThis(); - del = jest.fn().mockReturnThis(); - - zrem = jest.fn().mockReturnThis(); - - exec = jest.fn().mockReturnThis(); - - hset = jest.fn().mockReturnThis(); - zadd = jest.fn().mockReturnThis(); -} - -export default IORedis; diff --git a/asset-transfer-basic/rest-api-typescript/src/__mocks__/fabric-network.ts b/asset-transfer-basic/rest-api-typescript/src/__mocks__/fabric-network.ts index 626f34e6..4f58ae24 100644 --- a/asset-transfer-basic/rest-api-typescript/src/__mocks__/fabric-network.ts +++ b/asset-transfer-basic/rest-api-typescript/src/__mocks__/fabric-network.ts @@ -144,7 +144,7 @@ mockBasicContract.createTransaction const mockGetTransactionByIDTransaction = mock(); mockGetTransactionByIDTransaction.evaluate - .calledWith('mychannel', 'txn1') + .calledWith('mychannel', 'txn2') .mockResolvedValue(processedTransactionBuffer); mockGetTransactionByIDTransaction.evaluate .calledWith('mychannel', 'txn3') diff --git a/asset-transfer-basic/rest-api-typescript/src/__tests__/api.test.ts b/asset-transfer-basic/rest-api-typescript/src/__tests__/api.test.ts index 9b686f63..5b065426 100644 --- a/asset-transfer-basic/rest-api-typescript/src/__tests__/api.test.ts +++ b/asset-transfer-basic/rest-api-typescript/src/__tests__/api.test.ts @@ -592,7 +592,7 @@ describe('Asset Transfer Besic REST API', () => { it('GET should respond with json details for the specified transaction ID', async () => { const response = await request(app) - .get('/api/transactions/txn1') + .get('/api/transactions/txn2') .set('X-Api-Key', 'ORG1MOCKAPIKEY'); expect(response.statusCode).toEqual(200); expect(response.header).toHaveProperty( diff --git a/asset-transfer-basic/rest-api-typescript/src/assets.router.ts b/asset-transfer-basic/rest-api-typescript/src/assets.router.ts index 5d75c65a..6bac3866 100644 --- a/asset-transfer-basic/rest-api-typescript/src/assets.router.ts +++ b/asset-transfer-basic/rest-api-typescript/src/assets.router.ts @@ -84,6 +84,7 @@ assetsRouter.post( const transactionId = await submitTransaction( contract, redis, + mspId, 'CreateAsset', assetId, req.body.color, @@ -235,6 +236,7 @@ assetsRouter.put( const transactionId = await submitTransaction( contract, redis, + mspId, 'UpdateAsset', assetId, req.body.color, @@ -306,6 +308,7 @@ assetsRouter.patch( const transactionId = await submitTransaction( contract, redis, + mspId, 'TransferAsset', assetId, newOwner @@ -351,6 +354,7 @@ assetsRouter.delete('/:assetId', async (req: Request, res: Response) => { const transactionId = await submitTransaction( contract, redis, + mspId, 'DeleteAsset', assetId ); diff --git a/asset-transfer-basic/rest-api-typescript/src/fabric.spec.ts b/asset-transfer-basic/rest-api-typescript/src/fabric.spec.ts index 7c37bf00..b74ddf7a 100644 --- a/asset-transfer-basic/rest-api-typescript/src/fabric.spec.ts +++ b/asset-transfer-basic/rest-api-typescript/src/fabric.spec.ts @@ -7,12 +7,20 @@ import { createWallet, getContracts, getNetwork, - retryTransaction, + evatuateTransaction, + submitTransaction, + getBlockHeight, + startRetryLoop, } from './fabric'; import * as config from './config'; -import IORedis from './__mocks__/IORedis'; -import { Redis } from 'ioredis'; +import { + AssetExistsError, + AssetNotFoundError, + TransactionError, + TransactionNotFoundError, +} from './errors'; + import { Contract, Gateway, @@ -22,19 +30,14 @@ import { Wallet, } from 'fabric-network'; -import { mock } from 'jest-mock-extended'; +import * as fabricProtos from 'fabric-protos'; + +import { MockProxy, mock } from 'jest-mock-extended'; +import IORedis, { Redis } from 'ioredis'; +import Long from 'long'; jest.mock('./config'); -jest.mock('ioredis'); - -const redisOptions = { - port: config.redisPort, - host: config.redisHost, - username: config.redisUsername, - password: config.redisPassword, -}; - -const redis = new IORedis(redisOptions) as unknown as Redis; +jest.mock('ioredis', () => require('ioredis-mock/jest')); describe('Fabric', () => { describe('createWallet', () => { @@ -101,66 +104,393 @@ describe('Fabric', () => { }); }); - describe('Testing retryTransaction', () => { - const transactionId = + describe('startRetryLoop', () => { + let redis: Redis; + let mockTransaction: MockProxy; + let mockContract: MockProxy; + let mockContracts: Map; + + const mockTransactionId = '0ae62c01e4c4b112c3f3954a2f11243da76778e46df9ad2783bcbafc79652b95'; - const state = `{"name":"CreateAsset","nonce":"damqinq8nrI4n4qY8lFVsZw7RwG2ufrv","transactionId":${transactionId}`; - const args = '["test111","red",400,"Jean",101]'; - const timestamp = 1628078044362; - const savedTransaction = { - timestamp: timestamp.toString(), - state: state, - retries: '', - args: args, + const mockKey = `txn:${mockTransactionId}`; + const mockMspId = 'Org1MSP'; + const mockState = Buffer.from( + `{"name":"CreateAsset","nonce":"damqinq8nrI4n4qY8lFVsZw7RwG2ufrv","transactionId":${mockTransactionId}` + ); + const mockArgs = '["test111","red",400,"Jean",101]'; + const mockTimestamp = 1628078044362; + + const flushPromises = () => { + jest.useRealTimers(); + return new Promise((resolve) => setImmediate(resolve)); }; - describe('Check retry increment ', () => { - const transactionId = - '0ae62c01e4c4b112c3f3954a2f11243da76778e46df9ad2783bcbafc79652b95'; - const state = `{"name":"CreateAsset","nonce":"damqinq8nrI4n4qY8lFVsZw7RwG2ufrv","transactionId":${transactionId}`; - const args = '["test111","red",400,"Jean",101]'; - const timestamp = 1628078044362; - const savedTransaction = { - timestamp: timestamp.toString(), - state: state, - retries: '', - args: args, + const addMockTransationDetails = async (redis: Redis) => { + await redis + .multi() + .hset( + mockKey, + 'mspId', + mockMspId, + 'state', + mockState, + 'args', + mockArgs, + 'timestamp', + mockTimestamp, + 'retries', + '0' + ) + .zadd('index:txn:timestamp', mockTimestamp, mockTransactionId) + .exec(); + }; + + beforeEach(() => { + const redisOptions = { + port: config.redisPort, + host: config.redisHost, + username: config.redisUsername, + password: config.redisPassword, }; - it('Transaction failure, check redis increment func call', async () => { - const mockTransaction = mock(); - mockTransaction.submit.mockRejectedValue('MOCKERROR'); - const mockContract = mock(); - mockContract.deserializeTransaction.mockReturnValue(mockTransaction); + redis = new IORedis(redisOptions) as unknown as Redis; - savedTransaction.retries = '3'; - await retryTransaction( - mockContract, - redis, - transactionId, - savedTransaction - ); - expect(redis.hincrby).toHaveBeenCalledTimes(1); - }); + mockTransaction = mock(); + mockTransaction.submit + .mockResolvedValue(Buffer.from('MOCK PAYLOAD')) + .mockName('submit'); + mockContract = mock(); + mockContract.deserializeTransaction.mockReturnValue(mockTransaction); + mockContracts = new Map(); + mockContracts.set(mockMspId, mockContract); + + jest.useFakeTimers(); }); - describe('Transaction successful, check redis delete key func call ', () => { - it('call redis increment', async () => { - const mockTransaction = mock(); - mockTransaction.submit.mockResolvedValue(Buffer.from('{}')); - const mockContract = mock(); - mockContract.deserializeTransaction.mockReturnValue(mockTransaction); + afterEach(() => { + jest.useRealTimers(); + }); - savedTransaction.retries = '3'; - await retryTransaction( + it('starts a retry loop which does nothing if there are no saved transaction details', async () => { + const getContractSpy = jest.spyOn(mockContracts, 'get'); + + startRetryLoop(mockContracts, redis); + jest.runOnlyPendingTimers(); + await flushPromises(); + + expect(getContractSpy).not.toBeCalled(); + }); + + it('starts a retry loop which clears the saved details after succesfully retrying a transaction', async () => { + addMockTransationDetails(redis); + + startRetryLoop(mockContracts, redis); + jest.runOnlyPendingTimers(); + await flushPromises(); + + expect(mockContract.deserializeTransaction).toBeCalledWith(mockState); + expect(mockTransaction.submit).toBeCalledWith( + 'test111', + 'red', + 400, + 'Jean', + 101 + ); + + const index = await redis.zrange('index:txn:timestamp', 0, -1); + expect(index).toStrictEqual([]); + }); + + it('starts a retry loop which increments the retry count when a transaction fails', async () => { + addMockTransationDetails(redis); + mockTransaction.submit.mockRejectedValue(new Error('MOCK ERROR')); + + startRetryLoop(mockContracts, redis); + jest.runOnlyPendingTimers(); + await flushPromises(); + + expect(mockContract.deserializeTransaction).toBeCalledWith(mockState); + expect(mockTransaction.submit).toBeCalledWith( + 'test111', + 'red', + 400, + 'Jean', + 101 + ); + + const index = await redis.zrange('index:txn:timestamp', 0, -1); + expect(index).toStrictEqual([ + '0ae62c01e4c4b112c3f3954a2f11243da76778e46df9ad2783bcbafc79652b95', + ]); + + const savedTransaction = await (redis as Redis).hgetall(mockKey); + expect(savedTransaction.retries).toBe('1'); + }); + + it('starts a retry loop which clears the saved details when a transaction fails as a duplicate', async () => { + addMockTransationDetails(redis); + const mockDuplicateTransactionError = new Error('MOCK ERROR'); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (mockDuplicateTransactionError as any).errors = [ + { + endorsements: [ + { + details: 'duplicate transaction found', + }, + ], + }, + ]; + mockTransaction.submit.mockRejectedValue(mockDuplicateTransactionError); + + startRetryLoop(mockContracts, redis); + jest.runOnlyPendingTimers(); + await flushPromises(); + + expect(mockContract.deserializeTransaction).toBeCalledWith(mockState); + expect(mockTransaction.submit).toBeCalledWith( + 'test111', + 'red', + 400, + 'Jean', + 101 + ); + + const index = await redis.zrange('index:txn:timestamp', 0, -1); + expect(index).toStrictEqual([]); + }); + + it('starts a retry loop which clears the saved details when a transaction fails the final attempt', async () => { + addMockTransationDetails(redis); + await (redis as Redis).hincrby(mockKey, 'retries', 5); + mockTransaction.submit.mockRejectedValue(new Error('MOCK ERROR')); + + startRetryLoop(mockContracts, redis); + jest.runOnlyPendingTimers(); + await flushPromises(); + + expect(mockContract.deserializeTransaction).toBeCalledWith(mockState); + expect(mockTransaction.submit).toBeCalledWith( + 'test111', + 'red', + 400, + 'Jean', + 101 + ); + + const index = await redis.zrange('index:txn:timestamp', 0, -1); + expect(index).toStrictEqual([]); + }); + }); + + describe('evatuateTransaction', () => { + const mockPayload = Buffer.from('MOCK PAYLOAD'); + let mockTransaction: MockProxy; + let mockContract: MockProxy; + + beforeEach(() => { + mockTransaction = mock(); + mockTransaction.evaluate.mockResolvedValue(mockPayload); + mockContract = mock(); + mockContract.createTransaction + .calledWith('txn') + .mockReturnValue(mockTransaction); + }); + + it('gets the result of evaluating a transaction', async () => { + const result = await evatuateTransaction( + mockContract, + 'txn', + 'arga', + 'argb' + ); + expect(result.toString()).toBe(mockPayload.toString()); + }); + + it.each([ + 'the asset GOCHAINCODE already exists', + 'Asset JAVACHAINCODE already exists', + 'The asset JSCHAINCODE already exists', + ])( + 'throws an AssetExistsError an asset already exists error occurs: %s', + async (msg) => { + mockTransaction.evaluate.mockRejectedValue(new Error(msg)); + + await expect(async () => { + await evatuateTransaction(mockContract, 'txn', 'arga', 'argb'); + }).rejects.toThrow(AssetExistsError); + } + ); + + it.each([ + 'the asset GOCHAINCODE does not exist', + 'Asset JAVACHAINCODE does not exist', + 'The asset JSCHAINCODE does not exist', + ])( + 'throws an AssetNotFoundError if an asset does not exist error occurs: %s', + async (msg) => { + mockTransaction.evaluate.mockRejectedValue(new Error(msg)); + + await expect(async () => { + await evatuateTransaction(mockContract, 'txn', 'arga', 'argb'); + }).rejects.toThrow(AssetNotFoundError); + } + ); + + it('throws a TransactionNotFoundError if a transaction not found error occurs', async () => { + mockTransaction.evaluate.mockRejectedValue( + new Error( + 'Failed to get transaction with id txn, error Entry not found in index' + ) + ); + + await expect(async () => { + await evatuateTransaction(mockContract, 'txn', 'arga', 'argb'); + }).rejects.toThrow(TransactionNotFoundError); + }); + + it('throws a TransactionError for other errors', async () => { + mockTransaction.evaluate.mockRejectedValue(new Error('MOCK ERROR')); + + await expect(async () => { + await evatuateTransaction(mockContract, 'txn', 'arga', 'argb'); + }).rejects.toThrow(TransactionError); + }); + }); + + describe('submitTransaction', () => { + let redis: Redis; + const mockPayload = Buffer.from('MOCK PAYLOAD'); + let mockTransaction: MockProxy; + let mockContract: MockProxy; + + beforeEach(async () => { + const redisOptions = { + port: config.redisPort, + host: config.redisHost, + username: config.redisUsername, + password: config.redisPassword, + }; + + redis = new IORedis(redisOptions) as unknown as Redis; + + mockTransaction = mock(); + mockTransaction.submit.mockResolvedValue(mockPayload); + mockTransaction.getTransactionId.mockReturnValue('MOCK TXN ID'); + mockTransaction.serialize.mockReturnValue(Buffer.from('MOCK TXN STATE')); + mockContract = mock(); + mockContract.createTransaction + .calledWith('txn') + .mockReturnValue(mockTransaction); + }); + + it('gets the transaction ID of the submitted transaction', async () => { + const result = await submitTransaction( + mockContract, + redis, + 'mspid', + 'txn', + 'arga', + 'argb' + ); + expect(result).toBe('MOCK TXN ID'); + }); + + it.each([ + 'the asset GOCHAINCODE already exists', + 'Asset JAVACHAINCODE already exists', + 'The asset JSCHAINCODE already exists', + ])( + 'throws an AssetExistsError an asset already exists error occurs: %s', + async (msg) => { + mockTransaction.submit.mockRejectedValue(new Error(msg)); + + await expect(async () => { + await submitTransaction( + mockContract, + redis, + 'mspid', + 'txn', + 'arga', + 'argb' + ); + }).rejects.toThrow(AssetExistsError); + } + ); + + it.each([ + 'the asset GOCHAINCODE does not exist', + 'Asset JAVACHAINCODE does not exist', + 'The asset JSCHAINCODE does not exist', + ])( + 'throws an AssetNotFoundError if an asset does not exist error occurs: %s', + async (msg) => { + mockTransaction.submit.mockRejectedValue(new Error(msg)); + + await expect(async () => { + await submitTransaction( + mockContract, + redis, + 'mspid', + 'txn', + 'arga', + 'argb' + ); + }).rejects.toThrow(AssetNotFoundError); + } + ); + + it('throws a TransactionNotFoundError if a transaction not found error occurs', async () => { + mockTransaction.submit.mockRejectedValue( + new Error( + 'Failed to get transaction with id txn, error Entry not found in index' + ) + ); + + await expect(async () => { + await submitTransaction( mockContract, redis, - transactionId, - savedTransaction + 'mspid', + 'txn', + 'arga', + 'argb' ); + }).rejects.toThrow(TransactionNotFoundError); + }); - expect(redis.del).toHaveBeenCalledTimes(1); - }); + it('throws a TransactionError for other errors', async () => { + mockTransaction.submit.mockRejectedValue(new Error('MOCK ERROR')); + + await expect(async () => { + await submitTransaction( + mockContract, + redis, + 'mspid', + 'txn', + 'arga', + 'argb' + ); + }).rejects.toThrow(TransactionError); + }); + }); + + describe('getBlockHeight', () => { + it('gets the current block height', async () => { + const mockBlockchainInfoProto = + fabricProtos.common.BlockchainInfo.create(); + mockBlockchainInfoProto.height = 42; + const mockBlockchainInfoBuffer = Buffer.from( + fabricProtos.common.BlockchainInfo.encode( + mockBlockchainInfoProto + ).finish() + ); + const mockContract = mock(); + mockContract.evaluateTransaction + .calledWith('GetChainInfo', 'mychannel') + .mockResolvedValue(mockBlockchainInfoBuffer); + + const result = (await getBlockHeight(mockContract)) as Long; + expect(result.toInt()).toStrictEqual(42); }); }); }); diff --git a/asset-transfer-basic/rest-api-typescript/src/fabric.ts b/asset-transfer-basic/rest-api-typescript/src/fabric.ts index 44014ce4..363f9c72 100644 --- a/asset-transfer-basic/rest-api-typescript/src/fabric.ts +++ b/asset-transfer-basic/rest-api-typescript/src/fabric.ts @@ -20,8 +20,10 @@ import * as config from './config'; import { logger } from './logger'; import { storeTransactionDetails, + getRetryTransactionDetails, clearTransactionDetails, incrementRetryCount, + TransactionDetails, } from './redis'; import { AssetExistsError, @@ -115,52 +117,48 @@ export const getContracts = async ( return { assetContract, qsccContract }; }; -export const startRetryLoop = (contract: Contract, redis: Redis): void => { - setInterval( - async (redis) => { - try { - const pendingTransactionCount = await (redis as Redis).zcard( - 'index:txn:timestamp' - ); - logger.debug( - 'Transactions awaiting retry: %d', - pendingTransactionCount - ); - - // TODO pick a random transaction instead to reduce chances of - // clashing with other instances? Currently no zrandmember - // command though... - // https://github.com/luin/ioredis/issues/1374 - const transactionIds = await (redis as Redis).zrange( - 'index:txn:timestamp', - -1, - -1 - ); - - if (transactionIds.length > 0) { - const transactionId = transactionIds[0]; - const savedTransaction = await (redis as Redis).hgetall( - `txn:${transactionId}` +export const startRetryLoop = ( + contracts: Map, + redis: Redis +): void => { + const retryInterval = setInterval( + async (contracts, redis) => { + if (logger.isLevelEnabled('debug')) { + try { + const pendingTransactionCount = await (redis as Redis).zcard( + 'index:txn:timestamp' + ); + logger.debug( + '%d transactions awaiting retry', + pendingTransactionCount + ); + } catch (err) { + logger.warn({ err }, 'Error getting pending transaction count'); + } + } + + const savedTransaction = await getRetryTransactionDetails(redis); + + if (savedTransaction) { + const contract = contracts.get(savedTransaction.mspId); + + if (contract) { + await retryTransaction(contract, redis, savedTransaction); + } else { + logger.error( + 'No contract found for %s to retry transaction %s', + savedTransaction.mspId, + savedTransaction.transactionId ); - if (parseInt(savedTransaction.retries) >= config.maxRetryCount) { - await clearTransactionDetails(redis, transactionId); - } else { - await retryTransaction( - contract, - redis, - transactionId, - savedTransaction - ); - } } - } catch (err) { - // TODO just log? - logger.error(err, 'error getting saved transaction state'); } }, config.retryDelay, + contracts, redis ); + + retryInterval.unref(); }; export const evatuateTransaction = async ( @@ -173,7 +171,10 @@ export const evatuateTransaction = async ( try { const payload = await txn.evaluate(...transactionArgs); - logger.debug({ payload }, 'Evaluate transaction response received'); + logger.debug( + { transactionId: txnId, payload: payload.toString() }, + 'Evaluate transaction response received' + ); return payload; } catch (err) { throw handleError(txnId, err); @@ -183,6 +184,7 @@ export const evatuateTransaction = async ( export const submitTransaction = async ( contract: Contract, redis: Redis, + mspId: string, transactionName: string, ...transactionArgs: string[] ): Promise => { @@ -195,7 +197,14 @@ export const submitTransaction = async ( try { // Store the transaction details and set the event handler in case there // are problems later with commiting the transaction - await storeTransactionDetails(redis, txnId, txnState, txnArgs, timestamp); + await storeTransactionDetails( + redis, + txnId, + mspId, + txnState, + txnArgs, + timestamp + ); txn.setEventHandler(DefaultEventHandlerStrategies.NONE); await txn.submit(...transactionArgs); } catch (err) { @@ -212,6 +221,7 @@ export const submitTransaction = async ( // Unfortunately the chaincode samples do not use error codes, and the error // message text is not the same for each implementation +// TODO move to errors.ts? const handleError = (transactionId: string, err: Error): Error => { // This regex needs to match the following error messages: // "the asset %s already exists" @@ -266,40 +276,55 @@ const handleError = (transactionId: string, err: Error): Error => { return new TransactionError('Transaction error', transactionId); }; -export const retryTransaction = async ( +const retryTransaction = async ( contract: Contract, redis: Redis, - transactionId: string, - savedTransaction: Record + savedTransaction: TransactionDetails ): Promise => { - logger.debug('Retrying transaction %s', transactionId); + logger.debug('Retrying transaction %s', savedTransaction.transactionId); try { const transaction = contract.deserializeTransaction( - Buffer.from(savedTransaction.state) + savedTransaction.transactionState ); - const args: string[] = JSON.parse(savedTransaction.args); + const args: string[] = JSON.parse(savedTransaction.transactionArgs); - await transaction.submit(...args); - await clearTransactionDetails(redis, transactionId); + const payload = await transaction.submit(...args); + logger.debug( + { + transactionId: savedTransaction.transactionId, + payload: payload.toString(), + }, + 'Retry transaction response received' + ); + + await clearTransactionDetails(redis, savedTransaction.transactionId); } catch (err) { - if (isDuplicateTransaction(err)) { - logger.warn('Transaction %s has already been committed', transactionId); - await clearTransactionDetails(redis, transactionId); + if (isDuplicateTransactionError(err)) { + logger.warn( + 'Transaction %s has already been committed', + savedTransaction.transactionId + ); + await clearTransactionDetails(redis, savedTransaction.transactionId); } else { - // TODO check for retry limit and update timestamp logger.warn( err, 'Retry %d failed for transaction %s', savedTransaction.retries, - transactionId + savedTransaction.transactionId ); - await incrementRetryCount(redis, transactionId); + + if (savedTransaction.retries < config.maxRetryCount) { + await incrementRetryCount(redis, savedTransaction.transactionId); + } else { + await clearTransactionDetails(redis, savedTransaction.transactionId); + } } } }; -const isDuplicateTransaction = (error: { +// TODO move to errors.ts? +const isDuplicateTransactionError = (error: { errors: { endorsements: { details: string }[] }[]; }) => { // TODO this is horrible! Isn't it possible to check for TxValidationCode DUPLICATE_TXID somehow? diff --git a/asset-transfer-basic/rest-api-typescript/src/index.ts b/asset-transfer-basic/rest-api-typescript/src/index.ts index 722079c5..b8b13d69 100644 --- a/asset-transfer-basic/rest-api-typescript/src/index.ts +++ b/asset-transfer-basic/rest-api-typescript/src/index.ts @@ -2,24 +2,21 @@ * SPDX-License-Identifier: Apache-2.0 */ -import { Contract, Network } from 'fabric-network'; +import { Network } from 'fabric-network'; import { Redis } from 'ioredis'; import * as config from './config'; -import { startRetryLoop, blockEventHandler } from './fabric'; +import { blockEventHandler } from './fabric'; import { logger } from './logger'; import { createServer } from './server'; async function main() { const app = await createServer(); - // TODO block listener and retry logic currently only handles a single org!!! - // TODO should these be initialised here? - const contract = app.get(config.mspIdOrg1).assetContract as Contract; + // TODO block listener currently only handles a single org!!! + // TODO should it be initialised here? const redis = app.get('redis') as Redis; const network = app.get('networkOrg1') as Network; - await network.addBlockListener(blockEventHandler(redis)); - startRetryLoop(contract, redis); app.listen(config.port, () => { logger.info('Express server started on port: %d', config.port); diff --git a/asset-transfer-basic/rest-api-typescript/src/logger.ts b/asset-transfer-basic/rest-api-typescript/src/logger.ts index fe53a01e..1f1cea83 100644 --- a/asset-transfer-basic/rest-api-typescript/src/logger.ts +++ b/asset-transfer-basic/rest-api-typescript/src/logger.ts @@ -1,3 +1,7 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + */ + import pino from 'pino'; import * as config from './config'; diff --git a/asset-transfer-basic/rest-api-typescript/src/redis.spec.ts b/asset-transfer-basic/rest-api-typescript/src/redis.spec.ts index 7d81cb99..8b4f291c 100644 --- a/asset-transfer-basic/rest-api-typescript/src/redis.spec.ts +++ b/asset-transfer-basic/rest-api-typescript/src/redis.spec.ts @@ -1,75 +1,184 @@ -import IORedis from './__mocks__/IORedis'; +/* + * SPDX-License-Identifier: Apache-2.0 + */ + import * as config from './config'; -import { Redis } from 'ioredis'; +import IORedis, { Redis } from 'ioredis'; import { clearTransactionDetails, incrementRetryCount, storeTransactionDetails, + getTransactionDetails, + getRetryTransactionDetails, } from './redis'; -jest.mock('ioredis'); +jest.mock('ioredis', () => require('ioredis-mock/jest')); jest.mock('./config'); -const redisOptions = { - port: config.redisPort, - host: config.redisHost, - username: config.redisUsername, - password: config.redisPassword, -}; -const redis = new IORedis(redisOptions) as unknown as Redis; -describe('Testing increment retries ', () => { - const transactionId = +describe('Redis', () => { + let redis: Redis; + + const mockTransactionId = '0ae62c01e4c4b112c3f3954a2f11243da76778e46df9ad2783bcbafc79652b95'; - it('Should increment retries for valid transction id', async () => { - await incrementRetryCount(redis, transactionId); - expect(redis.hincrby).toHaveBeenCalledTimes(1); + const mockKey = `txn:${mockTransactionId}`; + const mockMspId = 'Org1MSP'; + const mockState = Buffer.from( + `{"name":"CreateAsset","nonce":"damqinq8nrI4n4qY8lFVsZw7RwG2ufrv","transactionId":${mockTransactionId}` + ); + const mockArgs = '["test111","red",400,"Jean",101]'; + const mockTimestamp = 1628078044362; + + const addMockTransationDetails = async (redis: Redis) => { + await redis + .multi() + .hset( + mockKey, + 'mspId', + mockMspId, + 'state', + mockState, + 'args', + mockArgs, + 'timestamp', + mockTimestamp, + 'retries', + '0' + ) + .zadd('index:txn:timestamp', mockTimestamp, mockTransactionId) + .exec(); + }; + + beforeEach(async () => { + const redisOptions = { + port: config.redisPort, + host: config.redisHost, + username: config.redisUsername, + password: config.redisPassword, + }; + + redis = new IORedis(redisOptions) as unknown as Redis; + }); + describe('storeTransactionDetails', () => { + it('stores transaction details as a hash', async () => { + await storeTransactionDetails( + redis, + mockTransactionId, + mockMspId, + mockState, + mockArgs, + mockTimestamp + ); + + const storedTransaction = await redis.hgetall(mockKey); + const expectedTransaction = { + mspId: mockMspId, + state: mockState, + args: mockArgs, + retries: '0', + timestamp: mockTimestamp.toString(), + }; + expect(storedTransaction).toStrictEqual(expectedTransaction); + }); + + it('adds the transaction ID to the sorted set timestamp index', async () => { + await storeTransactionDetails( + redis, + mockTransactionId, + mockMspId, + mockState, + mockArgs, + mockTimestamp + ); + + const index = await redis.zrange('index:txn:timestamp', 0, -1); + expect(index).toStrictEqual([mockTransactionId]); + }); + + // TODO this seems to work for spying/mocking... + // jest.spyOn(redis, 'multi').mock... + // but haven't worked out how to spy on the hset, zadd, exec in that chain + // Ask Mark? + it.todo('handles an error from redis'); }); - it('Should not increment retries for empty transaction id ', async () => { - await incrementRetryCount(redis, ''); - expect(redis.hincrby).toHaveBeenCalledTimes(0); - }); -}); + describe('getTransactionDetails', () => { + it('gets the transaction details from a hash', async () => { + await addMockTransationDetails(redis); -describe('Testing storeTransactionDetails ', () => { - const args = '["test111","red",400,"Jean",101]'; - const timestamp = 1628078044362; - it('Should store details for valid transction Id', async () => { - const transactionId = - '0ae62c01e4c4b112c3f3954a2f11243da76778e46df9ad2783bcbafc79652b95'; - const state = `{"name":"CreateAsset","nonce":"damqinq8nrI4n4qY8lFVsZw7RwG2ufrv","transactionId":${transactionId}`; - await storeTransactionDetails( - redis, - transactionId, - Buffer.from(state), - args, - timestamp + const details = await getTransactionDetails(redis, mockTransactionId); + + expect(details).toStrictEqual({ + transactionId: mockTransactionId, + mspId: mockMspId, + transactionState: mockState, + transactionArgs: mockArgs, + retries: 0, + timestamp: mockTimestamp, + }); + }); + + it.todo('handles an error from redis'); + }); + + describe('getRetryTransactionDetails', () => { + it('gets the oldest transaction details from a hash', async () => { + await addMockTransationDetails(redis); + + const details = await getRetryTransactionDetails(redis); + + expect(details).toStrictEqual({ + transactionId: mockTransactionId, + mspId: mockMspId, + transactionState: mockState, + transactionArgs: mockArgs, + retries: 0, + timestamp: mockTimestamp, + }); + }); + + it('gets undefined if there are no transactions to retry', async () => { + const details = await getRetryTransactionDetails(redis); + + expect(details).toBeUndefined(); + }); + + it.todo('handles an error from redis'); + }); + + describe('clearTransactionDetails', () => { + it('removes the transaction details hash', async () => { + await addMockTransationDetails(redis); + + await clearTransactionDetails(redis, mockTransactionId); + + const storedTransaction = await redis.hgetall(mockKey); + expect(storedTransaction).not.toHaveProperty('state'); + }); + + it('removes the transaction ID from the sorted set timestamp index', async () => { + await addMockTransationDetails(redis); + + await clearTransactionDetails(redis, mockTransactionId); + + const index = await redis.zrange('index:txn:timestamp', 0, -1); + expect(index).toStrictEqual([]); + }); + }); + + describe('incrementRetryCount', () => { + it('increments the retries value in the transction details hash', async () => { + await addMockTransationDetails(redis); + + await incrementRetryCount(redis, mockTransactionId); + + const retries = await redis.hget(mockKey, 'retries'); + expect(retries).toBe('1'); + }); + + it.todo( + 'updates the position of the transaction ID in the sorted set timestamp index' ); - expect(redis.hset).toHaveBeenCalledTimes(1); - expect(redis.zadd).toHaveBeenCalledTimes(1); - }); - it('Should not store details for empty transction Id', async () => { - const transactionId = ''; - const state = `{"name":"CreateAsset","nonce":"damqinq8nrI4n4qY8lFVsZw7RwG2ufrv","transactionId":${transactionId}`; - await storeTransactionDetails( - redis, - transactionId, - Buffer.from(state), - args, - timestamp - ); - expect(redis.hset).toHaveBeenCalledTimes(0); - expect(redis.zadd).toHaveBeenCalledTimes(0); - }); -}); - -describe('Testing clearTransactionDetails ', () => { - it('Should clear details ', async () => { - const transactionId = - '0ae62c01e4c4b112c3f3954a2f11243da76778e46df9ad2783bcbafc79652b95'; - await clearTransactionDetails(redis, transactionId); - expect(redis.del).toHaveBeenCalledTimes(1); - expect(redis.zrem).toHaveBeenCalledTimes(1); + it.todo('handles an error from redis'); }); }); diff --git a/asset-transfer-basic/rest-api-typescript/src/redis.ts b/asset-transfer-basic/rest-api-typescript/src/redis.ts index c4e8b7c3..fc89fa9b 100644 --- a/asset-transfer-basic/rest-api-typescript/src/redis.ts +++ b/asset-transfer-basic/rest-api-typescript/src/redis.ts @@ -1,5 +1,12 @@ /* * SPDX-License-Identifier: Apache-2.0 + * + * This sample includes basic retry logic so it needs somewhere to store + * transaction details in case the app restarts for any reason, and Redis is + * just one of the options available + * + * Note: This implementation is not designed with multiple instances of the + * REST app in mind, which is likely to be required in a production environment */ import IORedis, { Redis, RedisOptions } from 'ioredis'; @@ -16,29 +23,44 @@ const redisOptions: RedisOptions = { export const redis = new IORedis(redisOptions); +export type TransactionDetails = { + transactionId: string; + mspId: string; + transactionState: Buffer; + transactionArgs: string; + timestamp: number; + retries: number; +}; + +/* + * Store enough information in order to resubmit a transaction + */ export const storeTransactionDetails = async ( redis: Redis, transactionId: string, + mspId: string, transactionState: Buffer, transactionArgs: string, timestamp: number ): Promise => { try { - if (transactionId.length === 0) { - throw new Error('Empty transaction Id found'); - } const key = `txn:${transactionId}`; logger.debug( - 'Storing transaction details. Key: %s State: %s Args: %s Timestamp: %d', - key, - transactionState, - transactionArgs, - timestamp + { + key, + mspId, + transactionState, + transactionArgs, + timestamp, + }, + 'Storing transaction details' ); await redis .multi() .hset( key, + 'mspId', + mspId, 'state', transactionState, 'args', @@ -51,14 +73,84 @@ export const storeTransactionDetails = async ( .zadd('index:txn:timestamp', timestamp, transactionId) .exec(); } catch (err) { + // TODO just log?! logger.error( - err, - 'Error storing transaction details. ID %s', + { err }, + 'Error storing details for transaction ID %s', transactionId ); } }; +/* + * Get the information required to resubmit a transaction + */ +export const getTransactionDetails = async ( + redis: Redis, + transactionId: string +): Promise => { + try { + const savedTransaction = await (redis as Redis).hgetall( + `txn:${transactionId}` + ); + logger.debug( + { transactionId: transactionId, state: savedTransaction }, + 'Got transaction details' + ); + + const transactionDetails = { + transactionId: transactionId, + mspId: savedTransaction.mspId, + transactionState: Buffer.from(savedTransaction.state), + transactionArgs: savedTransaction.args, + timestamp: parseInt(savedTransaction.timestamp), + retries: parseInt(savedTransaction.retries), + }; + return transactionDetails; + } catch (err) { + // TODO just log?! + logger.error( + { err }, + 'Error getting details for transaction ID %s', + transactionId + ); + } +}; + +/* + * Get the oldest transaction details + */ +export const getRetryTransactionDetails = async ( + redis: Redis +): Promise => { + try { + const transactionIds = await (redis as Redis).zrange( + 'index:txn:timestamp', + -1, + -1 + ); + + if (transactionIds.length > 0) { + const transactionId = transactionIds[0]; + + const savedTransaction = await getTransactionDetails( + redis, + transactionId + ); + return savedTransaction; + } + } catch (err) { + // TODO just log?! + logger.error( + { err }, + 'Error getting details for next transaction to retry' + ); + } +}; + +/* + * Delete transaction details + */ export const clearTransactionDetails = async ( redis: Redis, transactionId: string @@ -72,16 +164,20 @@ export const clearTransactionDetails = async ( .zrem('index:txn:timestamp', transactionId) .exec(); } catch (err) { + // TODO just log?! logger.error( - err, - 'Error remove saved transaction state for transaction ID %s', + { err }, + 'Error remove details for transaction ID %s', transactionId ); } }; -// TODO add getTransaction etc. helpers? +/* + * Increment the number of times the transaction has been retried + * TODO needs to update the timestamp and index as well + */ export const incrementRetryCount = async ( redis: Redis, transactionId: string @@ -89,11 +185,9 @@ export const incrementRetryCount = async ( const key = `txn:${transactionId}`; logger.debug('Incrementing retries fortransaction Key: %s', key); try { - if (transactionId.length === 0) { - throw new Error('Empty transaction Id found'); - } await (redis as Redis).hincrby(`txn:${transactionId}`, 'retries', 1); } catch (err) { + // TODO just log?! logger.error( err, 'Error incrementing retries for transaction ID %s', diff --git a/asset-transfer-basic/rest-api-typescript/src/server.ts b/asset-transfer-basic/rest-api-typescript/src/server.ts index 14674a1c..e9e57e96 100644 --- a/asset-transfer-basic/rest-api-typescript/src/server.ts +++ b/asset-transfer-basic/rest-api-typescript/src/server.ts @@ -6,6 +6,7 @@ import helmet from 'helmet'; import { StatusCodes, getReasonPhrase } from 'http-status-codes'; import express, { Application, NextFunction, Request, Response } from 'express'; import pinoMiddleware from 'pino-http'; +import { Contract } from 'fabric-network'; import { logger } from './logger'; import { assetsRouter } from './assets.router'; @@ -16,6 +17,7 @@ import { getNetwork, createGateway, createWallet, + startRetryLoop, } from './fabric'; import { redis } from './redis'; import * as config from './config'; @@ -91,6 +93,11 @@ export const createServer = async (): Promise => { const contractsOrg2 = await getContracts(networkOrg2); app.set(config.mspIdOrg2, contractsOrg2); + const assetContracts = new Map(); + assetContracts.set(config.mspIdOrg1, contractsOrg1.assetContract); + assetContracts.set(config.mspIdOrg2, contractsOrg2.assetContract); + startRetryLoop(assetContracts, redis); + app.set('redis', redis); app.use('/', healthRouter); diff --git a/asset-transfer-basic/rest-api-typescript/src/transactions.router.ts b/asset-transfer-basic/rest-api-typescript/src/transactions.router.ts index 27f6438b..a91c2fc5 100644 --- a/asset-transfer-basic/rest-api-typescript/src/transactions.router.ts +++ b/asset-transfer-basic/rest-api-typescript/src/transactions.router.ts @@ -7,6 +7,7 @@ import { Contract } from 'fabric-network'; import { protos } from 'fabric-protos'; import { getReasonPhrase, StatusCodes } from 'http-status-codes'; import { Redis } from 'ioredis'; +import { getTransactionDetails } from './redis'; import { evatuateTransaction } from './fabric'; import { logger } from './logger'; import * as config from './config'; @@ -33,18 +34,14 @@ transactionsRouter.get( const redis = req.app.get('redis') as Redis; try { - const savedTransaction = await (redis as Redis).hgetall( - `txn:${transactionId}` - ); - logger.debug( - { transactionId: transactionId, state: savedTransaction }, - 'Saved transaction state' + const savedTransaction = await getTransactionDetails( + redis, + transactionId ); - if (savedTransaction.state) { + if (savedTransaction?.transactionState) { foundTransaction = true; - const retries = parseInt(savedTransaction.retries); - if (retries > 0) { + if (savedTransaction.retries > 0) { progress = 'RETRYING'; } else { progress = 'ACCEPTED';