From 5d92abc52d9a4467f09336fda229df8ede396f68 Mon Sep 17 00:00:00 2001 From: James Taylor Date: Wed, 8 Dec 2021 16:58:27 +0000 Subject: [PATCH] Clarify retry logic Improve the separation between Fabric logic and the job queue implementation details Signed-off-by: James Taylor --- .../rest-api-typescript/README.md | 2 +- .../rest-api-typescript/src/config.spec.ts | 6 +- .../rest-api-typescript/src/config.ts | 6 +- .../rest-api-typescript/src/errors.spec.ts | 116 +++++++++++++- .../rest-api-typescript/src/errors.ts | 141 ++++++++++++------ .../rest-api-typescript/src/fabric.spec.ts | 129 ---------------- .../rest-api-typescript/src/fabric.ts | 125 +--------------- .../rest-api-typescript/src/index.ts | 23 ++- .../rest-api-typescript/src/jobs.router.ts | 3 +- .../rest-api-typescript/src/jobs.spec.ts | 131 +++++++++++++++- .../rest-api-typescript/src/jobs.ts | 125 +++++++++++++++- .../rest-api-typescript/src/redis.ts | 2 +- 12 files changed, 486 insertions(+), 323 deletions(-) diff --git a/asset-transfer-basic/rest-api-typescript/README.md b/asset-transfer-basic/rest-api-typescript/README.md index ceb689a9..fda1e708 100644 --- a/asset-transfer-basic/rest-api-typescript/README.md +++ b/asset-transfer-basic/rest-api-typescript/README.md @@ -51,7 +51,7 @@ Create a `.env` file to configure the server for the test network (make sure TES TEST_NETWORK_HOME=$HOME/fabric-samples/test-network npm run generateEnv ``` -Start a Redis server +Start a Redis server (Redis is used to store the queue of submit transactions) ```shell npm run start:redis diff --git a/asset-transfer-basic/rest-api-typescript/src/config.spec.ts b/asset-transfer-basic/rest-api-typescript/src/config.spec.ts index f095c1b5..96ffaad5 100644 --- a/asset-transfer-basic/rest-api-typescript/src/config.spec.ts +++ b/asset-transfer-basic/rest-api-typescript/src/config.spec.ts @@ -289,9 +289,9 @@ describe('Config values', () => { }); describe('commitTimeout', () => { - it('defaults to "3000"', () => { + it('defaults to "300"', () => { const config = require('./config'); - expect(config.commitTimeout).toBe(3000); + expect(config.commitTimeout).toBe(300); }); it('can be configured using the "HLF_COMMIT_TIMEOUT" environment variable', () => { @@ -305,7 +305,7 @@ describe('Config values', () => { expect(() => { require('./config'); }).toThrow( - 'env-var: "HLF_COMMIT_TIMEOUT" should be a valid integer. An example of a valid value would be: 3000' + 'env-var: "HLF_COMMIT_TIMEOUT" should be a valid integer. An example of a valid value would be: 300' ); }); }); diff --git a/asset-transfer-basic/rest-api-typescript/src/config.ts b/asset-transfer-basic/rest-api-typescript/src/config.ts index 46933719..87b64719 100644 --- a/asset-transfer-basic/rest-api-typescript/src/config.ts +++ b/asset-transfer-basic/rest-api-typescript/src/config.ts @@ -3,12 +3,12 @@ * * The sample REST server can be configured using the environment variables * documented below - * + * * In a local development environment, these variables can be loaded from a * .env file by starting the server with the following command: - * + * * npm start:dev - * + * * The scripts/generateEnv.sh script can be used to generate a suitable .env * file for the Fabric Test Network */ diff --git a/asset-transfer-basic/rest-api-typescript/src/errors.spec.ts b/asset-transfer-basic/rest-api-typescript/src/errors.spec.ts index 6ef490f6..90f37a1c 100644 --- a/asset-transfer-basic/rest-api-typescript/src/errors.spec.ts +++ b/asset-transfer-basic/rest-api-typescript/src/errors.spec.ts @@ -2,15 +2,20 @@ * SPDX-License-Identifier: Apache-2.0 */ +import { TimeoutError, TransactionError } from 'fabric-network'; import { AssetExistsError, AssetNotFoundError, TransactionNotFoundError, + getRetryAction, handleError, isDuplicateTransactionError, isErrorLike, + RetryAction, } from './errors'; +import { mock } from 'jest-mock-extended'; + describe('Errors', () => { describe('isErrorLike', () => { it('returns false for null', () => { @@ -53,6 +58,24 @@ describe('Errors', () => { }); describe('isDuplicateTransactionError', () => { + it('returns true for a TransactionError with a transaction code of DUPLICATE_TXID', () => { + const mockDuplicateTransactionError = mock(); + mockDuplicateTransactionError.transactionCode = 'DUPLICATE_TXID'; + + expect(isDuplicateTransactionError(mockDuplicateTransactionError)).toBe( + true + ); + }); + + it('returns false for a TransactionError without a transaction code of MVCC_READ_CONFLICT', () => { + const mockDuplicateTransactionError = mock(); + mockDuplicateTransactionError.transactionCode = 'MVCC_READ_CONFLICT'; + + expect(isDuplicateTransactionError(mockDuplicateTransactionError)).toBe( + false + ); + }); + it('returns true for an error when all endorsement details are duplicate transaction found', () => { const mockDuplicateTransactionError = { errors: [ @@ -155,13 +178,96 @@ describe('Errors', () => { }); }); + describe('getRetryAction', () => { + it('returns RetryAction.None for duplicate transaction errors', () => { + const mockDuplicateTransactionError = { + errors: [ + { + endorsements: [ + { + details: 'duplicate transaction found', + }, + { + details: 'duplicate transaction found', + }, + { + details: 'duplicate transaction found', + }, + ], + }, + ], + }; + + expect(getRetryAction(mockDuplicateTransactionError)).toBe( + RetryAction.None + ); + }); + + it('returns RetryAction.None for a TransactionNotFoundError', () => { + const mockTransactionNotFoundError = new TransactionNotFoundError('Failed to get transaction with id txn, error Entry not found in index', 'txn1'); + + expect(getRetryAction(mockTransactionNotFoundError)).toBe( + RetryAction.None + ); + }); + + it('returns RetryAction.None for an AssetExistsError', () => { + const mockAssetExistsError = new AssetExistsError('The asset MOCK_ASSET already exists', 'txn1'); + + expect(getRetryAction(mockAssetExistsError)).toBe( + RetryAction.None + ); + }); + + it('returns RetryAction.None for an AssetNotFoundError', () => { + const mockAssetNotFoundError = new AssetNotFoundError('the asset MOCK_ASSET does not exist', 'txn1'); + + expect(getRetryAction(mockAssetNotFoundError)).toBe( + RetryAction.None + ); + }); + + it('returns RetryAction.WithExistingTransactionId for a TimeoutError', () => { + const mockTimeoutError = new TimeoutError('MOCK TIMEOUT ERROR'); + + expect(getRetryAction(mockTimeoutError)).toBe( + RetryAction.WithExistingTransactionId + ); + }); + + it('returns RetryAction.WithNewTransactionId for an MVCC_READ_CONFLICT TransactionError', () => { + const mockTransactionError = mock(); + mockTransactionError.transactionCode = 'MVCC_READ_CONFLICT'; + + expect(getRetryAction(mockTransactionError)).toBe( + RetryAction.WithNewTransactionId + ); + }); + + it('returns RetryAction.WithNewTransactionId for an Error', () => { + const mockError = new Error('MOCK ERROR'); + + expect(getRetryAction(mockError)).toBe( + RetryAction.WithNewTransactionId + ); + }); + + it('returns RetryAction.WithNewTransactionId for a string error', () => { + const mockError = 'MOCK ERROR'; + + expect(getRetryAction(mockError)).toBe( + RetryAction.WithNewTransactionId + ); + }); + }); + describe('handleError', () => { it.each([ 'the asset GOCHAINCODE already exists', 'Asset JAVACHAINCODE already exists', 'The asset JSCHAINCODE already exists', ])( - 'returns an AssetExistsError for errors with an asset already exists message: %s', + 'returns a AssetExistsError for errors with an asset already exists message: %s', (msg) => { expect(handleError('txn1', new Error(msg))).toStrictEqual( new AssetExistsError(msg, 'txn1') @@ -174,7 +280,7 @@ describe('Errors', () => { 'Asset JAVACHAINCODE does not exist', 'The asset JSCHAINCODE does not exist', ])( - 'returns an AssetNotFoundError for errors with an asset does not exist message: %s', + 'returns a AssetNotFoundError for errors with an asset does not exist message: %s', (msg) => { expect(handleError('txn1', new Error(msg))).toStrictEqual( new AssetNotFoundError(msg, 'txn1') @@ -200,10 +306,8 @@ describe('Errors', () => { ); }); - it('returns a new Error object for errors of other types', () => { - expect(handleError('txn1', 42)).toStrictEqual( - new Error('Unhandled error: 42') - ); + it('returns the original error for errors of other types', () => { + expect(handleError('txn1', 42)).toEqual(42); }); }); }); diff --git a/asset-transfer-basic/rest-api-typescript/src/errors.ts b/asset-transfer-basic/rest-api-typescript/src/errors.ts index 89eaaa6b..66272e87 100644 --- a/asset-transfer-basic/rest-api-typescript/src/errors.ts +++ b/asset-transfer-basic/rest-api-typescript/src/errors.ts @@ -1,9 +1,18 @@ /* * SPDX-License-Identifier: Apache-2.0 + * + * This file contains all the error handling for Fabric transactions, including + * whether a transaction should be retried. */ +import { TimeoutError, TransactionError } from 'fabric-network'; import { logger } from './logger'; +/* + * Base type for errors from the smart contract. + * + * These errors will not be retried. + */ export class ContractError extends Error { transactionId: string; @@ -16,18 +25,23 @@ export class ContractError extends Error { } } +/* + * Represents the error which occurs when the transaction being submitted or + * evaluated is not implemented in a smart contract. + */ export class TransactionNotFoundError extends ContractError { - transactionId: string; - constructor(message: string, transactionId: string) { super(message, transactionId); Object.setPrototypeOf(this, TransactionNotFoundError.prototype); this.name = 'TransactionNotFoundError'; - this.transactionId = transactionId; } } +/* + * Represents the error which occurs in the basic asset transfer smart contract + * implementation when an asset already exists. + */ export class AssetExistsError extends ContractError { constructor(message: string, transactionId: string) { super(message, transactionId); @@ -37,6 +51,10 @@ export class AssetExistsError extends ContractError { } } +/* + * Represents the error which occurs in the basic asset transfer smart contract + * implementation when an asset does not exist. + */ export class AssetNotFoundError extends ContractError { constructor(message: string, transactionId: string) { super(message, transactionId); @@ -46,18 +64,53 @@ export class AssetNotFoundError extends ContractError { } } -export class JobNotFoundError extends Error { - jobId: string; - - constructor(message: string, jobId: string) { - super(message); - Object.setPrototypeOf(this, JobNotFoundError.prototype); - - this.name = 'JobNotFoundError'; - this.jobId = jobId; - } +/* + * Enumeration of possible retry actions. + * + * WithExistingTransactionId - transactions should be retried using the same + * transaction ID to protect against duplicate transactions being committed if + * a timeout error occurs + * + * WithNewTransactionId - transactions which could not be committed due to + * other errors require a new transaction ID when retrying + * + * None - transactions that failed due to a duplicate transaction error, or + * errors from the smart contract, should not be retried + */ +export enum RetryAction { + WithExistingTransactionId, + WithNewTransactionId, + None, } +/* + * Get the required transaction retry action for an error. + * + * For this sample transactions are considered retriable if they fail with any + * error, *except* for duplicate transaction errors, or errors from the smart + * contract. + * + * You might decide to retry transactions which fail with specific errors + * instead, for example: + * MVCC_READ_CONFLICT + * PHANTOM_READ_CONFLICT + * ENDORSEMENT_POLICY_FAILURE + * CHAINCODE_VERSION_CONFLICT + * EXPIRED_CHAINCODE + */ +export const getRetryAction = (err: unknown): RetryAction => { + if (isDuplicateTransactionError(err) || err instanceof ContractError) { + return RetryAction.None; + } else if (err instanceof TimeoutError) { + return RetryAction.WithExistingTransactionId; + } + + return RetryAction.WithNewTransactionId; +}; + +/* + * Type guard to make catching unknown errors easier + */ export const isErrorLike = (err: unknown): err is Error => { return ( err != undefined && @@ -72,23 +125,32 @@ export const isErrorLike = (err: unknown): err is Error => { /* * Checks whether an error was caused by a duplicate transaction. * - * Checking error strings like this is not ideal, unfortunately it appears to - * be the only option. In this case it would be better to check for the - * DUPLICATE_TXID TxValidationCode somehow but that does not seem to be - * possible. + * This is ...painful. */ export const isDuplicateTransactionError = (err: unknown): boolean => { + logger.debug({ err }, 'Checking for duplicate transaction error'); + if (err === undefined || err === null) return false; - const endorsementError = err as { - errors: { endorsements: { details: string }[] }[]; - }; + let isDuplicate; + if (typeof (err as TransactionError).transactionCode === 'string') { + // Checking whether a commit failure is caused by a duplicate transaction + // is straightforward because the transaction code should be available + isDuplicate = + (err as TransactionError).transactionCode === 'DUPLICATE_TXID'; + } else { + // Checking whether an endorsement failure is caused by a duplicate + // transaction is only possible by processing error strings, which is not ideal. + const endorsementError = err as { + errors: { endorsements: { details: string }[] }[]; + }; - const isDuplicate = endorsementError?.errors?.some((err) => - err?.endorsements?.some((endorsement) => - endorsement?.details?.startsWith('duplicate transaction found') - ) - ); + isDuplicate = endorsementError?.errors?.some((err) => + err?.endorsements?.some((endorsement) => + endorsement?.details?.startsWith('duplicate transaction found') + ) + ); + } return isDuplicate === true; }; @@ -167,27 +229,18 @@ const matchTransactionDoesNotExistMessage = ( return null; }; -export const isContractError = (err: unknown): boolean => { - if ( - err instanceof AssetExistsError || - err instanceof AssetNotFoundError || - err instanceof TransactionNotFoundError - ) { - return true; - } - - return false; -}; - /* * Handles errors from evaluating and submitting transactions. * - * As with duplicate transaction errors, checking error strings like this is - * not ideal. Unfortunately the chaincode samples do not use error codes so - * again it's the only option. The error message text is not even the same for - * the Go, Java, and Javascript implementations of the chaincode! + * Smart contract errors from the the basic asset transfer samples do not use + * error codes so matching strings is the only option, which is not ideal. + * Note: the error message text is not the same for the Go, Java, and + * Javascript implementations of the chaincode! */ -export const handleError = (transactionId: string, err: unknown): Error => { +export const handleError = ( + transactionId: string, + err: unknown +): Error | unknown => { logger.debug({ transactionId: transactionId, err }, 'Processing error'); if (isErrorLike(err)) { @@ -210,9 +263,7 @@ export const handleError = (transactionId: string, err: unknown): Error => { transactionId ); } - - return err; } - return new Error(`Unhandled error: ${err}`); + return err; }; diff --git a/asset-transfer-basic/rest-api-typescript/src/fabric.spec.ts b/asset-transfer-basic/rest-api-typescript/src/fabric.spec.ts index 34e6f2f0..b7ba2805 100644 --- a/asset-transfer-basic/rest-api-typescript/src/fabric.spec.ts +++ b/asset-transfer-basic/rest-api-typescript/src/fabric.spec.ts @@ -11,7 +11,6 @@ import { submitTransaction, getBlockHeight, getTransactionValidationCode, - processSubmitTransactionJob, } from './fabric'; import * as config from './config'; @@ -34,7 +33,6 @@ import * as fabricProtos from 'fabric-protos'; import { MockProxy, mock } from 'jest-mock-extended'; import Long from 'long'; -import { Job } from 'bullmq'; jest.mock('./config'); jest.mock('fabric-network', () => { @@ -117,133 +115,6 @@ describe('Fabric', () => { }); }); - describe('processSubmitTransactionJob', () => { - const mockContracts = new Map(); - const mockPayload = Buffer.from('MOCK PAYLOAD'); - const mockSavedState = Buffer.from('MOCK SAVED STATE'); - let mockTransaction: MockProxy; - let mockContract: MockProxy; - let mockJob: MockProxy; - - beforeEach(() => { - mockTransaction = mock(); - mockTransaction.getTransactionId.mockReturnValue('mockTransactionId'); - - mockContract = mock(); - mockContract.createTransaction - .calledWith('txn') - .mockReturnValue(mockTransaction); - mockContract.deserializeTransaction - .calledWith(mockSavedState) - .mockReturnValue(mockTransaction); - mockContracts.set('mockMspid', mockContract); - - mockJob = mock(); - }); - - it('gets job result with no error or payload if no contract is available for the required mspid', async () => { - mockJob.data = { - mspid: 'missingMspid', - }; - - const jobResult = await processSubmitTransactionJob( - mockContracts, - mockJob - ); - - expect(jobResult).toStrictEqual({ - transactionError: undefined, - transactionPayload: undefined, - }); - }); - - it('gets a job result containing a payload if the transaction was successful first time', async () => { - mockJob.data = { - mspid: 'mockMspid', - transactionName: 'txn', - transactionArgs: ['arg1', 'arg2'], - }; - mockTransaction.submit - .calledWith('arg1', 'arg2') - .mockResolvedValue(mockPayload); - - const jobResult = await processSubmitTransactionJob( - mockContracts, - mockJob - ); - - expect(jobResult).toStrictEqual({ - transactionError: undefined, - transactionPayload: Buffer.from('MOCK PAYLOAD'), - }); - }); - - it('gets a job result containing a payload if the transaction was successfully rerun using saved transaction state', async () => { - mockJob.data = { - mspid: 'mockMspid', - transactionName: 'txn', - transactionArgs: ['arg1', 'arg2'], - transactionState: mockSavedState, - }; - mockTransaction.submit - .calledWith('arg1', 'arg2') - .mockResolvedValue(mockPayload); - - const jobResult = await processSubmitTransactionJob( - mockContracts, - mockJob - ); - - expect(jobResult).toStrictEqual({ - transactionError: undefined, - transactionPayload: Buffer.from('MOCK PAYLOAD'), - }); - }); - - it('gets a job result containing an error message if the transaction fails but cannot be retried', async () => { - mockJob.data = { - mspid: 'mockMspid', - transactionName: 'txn', - transactionArgs: ['arg1', 'arg2'], - transactionState: mockSavedState, - }; - mockTransaction.submit - .calledWith('arg1', 'arg2') - .mockRejectedValue( - new Error( - 'Failed to get transaction with id txn, error Entry not found in index' - ) - ); - - const jobResult = await processSubmitTransactionJob( - mockContracts, - mockJob - ); - - expect(jobResult).toStrictEqual({ - transactionError: - 'TransactionNotFoundError: Failed to get transaction with id txn, error Entry not found in index', - transactionPayload: undefined, - }); - }); - - it('throws an error if the transaction fails but can be retried', async () => { - mockJob.data = { - mspid: 'mockMspid', - transactionName: 'txn', - transactionArgs: ['arg1', 'arg2'], - transactionState: mockSavedState, - }; - mockTransaction.submit - .calledWith('arg1', 'arg2') - .mockRejectedValue(new Error('MOCK ERROR')); - - await expect(async () => { - await processSubmitTransactionJob(mockContracts, mockJob); - }).rejects.toThrow('MOCK ERROR'); - }); - }); - describe('evatuateTransaction', () => { const mockPayload = Buffer.from('MOCK PAYLOAD'); let mockTransaction: MockProxy; diff --git a/asset-transfer-basic/rest-api-typescript/src/fabric.ts b/asset-transfer-basic/rest-api-typescript/src/fabric.ts index 70a3a6f2..1a9f8b6e 100644 --- a/asset-transfer-basic/rest-api-typescript/src/fabric.ts +++ b/asset-transfer-basic/rest-api-typescript/src/fabric.ts @@ -10,20 +10,13 @@ import { GatewayOptions, Wallets, Network, - TimeoutError, Transaction, Wallet, } from 'fabric-network'; import * as config from './config'; import { logger } from './logger'; -import { - handleError, - isContractError, - isDuplicateTransactionError, -} from './errors'; +import { handleError } from './errors'; import * as protos from 'fabric-protos'; -import { Job } from 'bullmq'; -import { JobData, JobResult, updateJobData } from './jobs'; /* * Creates an in memory wallet to hold credentials for an Org1 and Org2 user @@ -120,122 +113,6 @@ export const getContracts = async ( return { assetContract, qsccContract }; }; -/* - * Process a submit transaction request from the job queue - * - * For this sample transactions are retried if they fail with any error, - * except for errors from the smart contract, or duplicate transaction - * errors - * - * You might decide to retry transactions which fail with specific errors - * instead, for example: - * MVCC_READ_CONFLICT - * PHANTOM_READ_CONFLICT - * ENDORSEMENT_POLICY_FAILURE - * CHAINCODE_VERSION_CONFLICT - * EXPIRED_CHAINCODE - */ -export const processSubmitTransactionJob = async ( - contracts: Map, - job: Job -): Promise => { - logger.debug({ jobId: job.id, jobName: job.name }, 'Processing job'); - - const contract = contracts.get(job.data.mspid); - if (contract === undefined) { - logger.error( - { jobId: job.id, jobName: job.name }, - 'Contract not found for MSP ID %s', - job.data.mspid - ); - - // Retrying will not work, so give up with an unsuccessful result - return { - transactionError: undefined, - transactionPayload: undefined, - }; - } - - let transaction: Transaction; - if (job.data.transactionState) { - const savedState = job.data.transactionState; - logger.debug( - { - jobId: job.id, - jobName: job.name, - savedState, - }, - 'Using previously saved transaction state' - ); - - transaction = contract.deserializeTransaction(savedState); - } else { - logger.debug( - { - jobId: job.id, - jobName: job.name, - }, - 'Using new transaction' - ); - - transaction = contract.createTransaction(job.data.transactionName); - await updateJobData(job, transaction); - } - - try { - logger.debug( - { - jobId: job.id, - jobName: job.name, - transactionId: transaction.getTransactionId(), - }, - 'Submitting transaction' - ); - const args = job.data.transactionArgs; - const payload = await submitTransaction(transaction, ...args); - - return { - transactionError: undefined, - transactionPayload: payload, - }; - } catch (err) { - if ( - err instanceof Error && - (isContractError(err) || isDuplicateTransactionError(err)) - ) { - logger.error( - { jobId: job.id, jobName: job.name, err }, - 'Fatal transaction error occurred' - ); - - // Return a job result to stop retrying - return { - transactionError: err.toString(), - transactionPayload: undefined, - }; - } else { - logger.warn( - { jobId: job.id, jobName: job.name, err }, - 'Retryable transaction error occurred' - ); - - // The original transaction may eventually get committed in the case of - // a timeout error, so keep the same transaction ID to protect against - // unintended duplicate transactions - if (!(err instanceof TimeoutError)) { - logger.debug( - { jobId: job.id, jobName: job.name }, - 'Clearing saved transaction state' - ); - await updateJobData(job, undefined); - } - - // Rethrow the error to keep retrying - throw err; - } - } -}; - /* * Evaluate a transaction and handle any errors */ diff --git a/asset-transfer-basic/rest-api-typescript/src/index.ts b/asset-transfer-basic/rest-api-typescript/src/index.ts index 6b81651c..1e222713 100644 --- a/asset-transfer-basic/rest-api-typescript/src/index.ts +++ b/asset-transfer-basic/rest-api-typescript/src/index.ts @@ -4,11 +4,32 @@ * This is the main entrypoint for the sample REST server, which is responsible * for connecting to the Fabric network and setting up a job queue for * processing submit transactions + * + * You can find more details related to the Fabric aspects of the sample in the + * following files: + * + * - errors.ts + * Fabric transaction error handling and retry logic + * - fabric.ts + * all the sample code which interacts with the Fabric SDK * - * You can find details of other aspects of the sample in the following files: + * The remaining files are related to the REST server aspects of the sample, + * rather than Fabric itself: * + * - *.router.ts + * details of the REST endpoints provided by the sample + * - auth.ts + * basic API key authentication strategy used for the sample * - config.ts * descriptions of all the available configuration environment variables + * - jobs.ts + * job queue implementation details + * - logger.ts + * logging implementation details + * - redis.ts + * redis implementation details + * - server.ts + * express server implementation details */ import { Contract } from 'fabric-network'; diff --git a/asset-transfer-basic/rest-api-typescript/src/jobs.router.ts b/asset-transfer-basic/rest-api-typescript/src/jobs.router.ts index 097d19a7..77dc5fe9 100644 --- a/asset-transfer-basic/rest-api-typescript/src/jobs.router.ts +++ b/asset-transfer-basic/rest-api-typescript/src/jobs.router.ts @@ -5,8 +5,7 @@ import { Queue } from 'bullmq'; import express, { Request, Response } from 'express'; import { getReasonPhrase, StatusCodes } from 'http-status-codes'; -import { JobNotFoundError } from './errors'; -import { getJobSummary } from './jobs'; +import { getJobSummary, JobNotFoundError } from './jobs'; import { logger } from './logger'; const { INTERNAL_SERVER_ERROR, NOT_FOUND, OK } = StatusCodes; diff --git a/asset-transfer-basic/rest-api-typescript/src/jobs.spec.ts b/asset-transfer-basic/rest-api-typescript/src/jobs.spec.ts index 05fe6af0..43ba4d6d 100644 --- a/asset-transfer-basic/rest-api-typescript/src/jobs.spec.ts +++ b/asset-transfer-basic/rest-api-typescript/src/jobs.spec.ts @@ -3,9 +3,9 @@ */ import { Job, Queue } from 'bullmq'; -import { getJobCounts, getJobSummary } from './jobs'; +import { getJobCounts, getJobSummary, processSubmitTransactionJob, JobNotFoundError } from './jobs'; +import { Contract, Transaction } from 'fabric-network'; import { mock, MockProxy } from 'jest-mock-extended'; -import { JobNotFoundError } from './errors'; describe('initJobQueue', () => { it.todo('write tests'); @@ -152,4 +152,131 @@ describe('getJobCounts', () => { waiting: 5, }); }); + + describe('processSubmitTransactionJob', () => { + const mockContracts = new Map(); + const mockPayload = Buffer.from('MOCK PAYLOAD'); + const mockSavedState = Buffer.from('MOCK SAVED STATE'); + let mockTransaction: MockProxy; + let mockContract: MockProxy; + let mockJob: MockProxy; + + beforeEach(() => { + mockTransaction = mock(); + mockTransaction.getTransactionId.mockReturnValue('mockTransactionId'); + + mockContract = mock(); + mockContract.createTransaction + .calledWith('txn') + .mockReturnValue(mockTransaction); + mockContract.deserializeTransaction + .calledWith(mockSavedState) + .mockReturnValue(mockTransaction); + mockContracts.set('mockMspid', mockContract); + + mockJob = mock(); + }); + + it('gets job result with no error or payload if no contract is available for the required mspid', async () => { + mockJob.data = { + mspid: 'missingMspid', + }; + + const jobResult = await processSubmitTransactionJob( + mockContracts, + mockJob + ); + + expect(jobResult).toStrictEqual({ + transactionError: undefined, + transactionPayload: undefined, + }); + }); + + it('gets a job result containing a payload if the transaction was successful first time', async () => { + mockJob.data = { + mspid: 'mockMspid', + transactionName: 'txn', + transactionArgs: ['arg1', 'arg2'], + }; + mockTransaction.submit + .calledWith('arg1', 'arg2') + .mockResolvedValue(mockPayload); + + const jobResult = await processSubmitTransactionJob( + mockContracts, + mockJob + ); + + expect(jobResult).toStrictEqual({ + transactionError: undefined, + transactionPayload: Buffer.from('MOCK PAYLOAD'), + }); + }); + + it('gets a job result containing a payload if the transaction was successfully rerun using saved transaction state', async () => { + mockJob.data = { + mspid: 'mockMspid', + transactionName: 'txn', + transactionArgs: ['arg1', 'arg2'], + transactionState: mockSavedState, + }; + mockTransaction.submit + .calledWith('arg1', 'arg2') + .mockResolvedValue(mockPayload); + + const jobResult = await processSubmitTransactionJob( + mockContracts, + mockJob + ); + + expect(jobResult).toStrictEqual({ + transactionError: undefined, + transactionPayload: Buffer.from('MOCK PAYLOAD'), + }); + }); + + it('gets a job result containing an error message if the transaction fails but cannot be retried', async () => { + mockJob.data = { + mspid: 'mockMspid', + transactionName: 'txn', + transactionArgs: ['arg1', 'arg2'], + transactionState: mockSavedState, + }; + mockTransaction.submit + .calledWith('arg1', 'arg2') + .mockRejectedValue( + new Error( + 'Failed to get transaction with id txn, error Entry not found in index' + ) + ); + + const jobResult = await processSubmitTransactionJob( + mockContracts, + mockJob + ); + + expect(jobResult).toStrictEqual({ + transactionError: + 'TransactionNotFoundError: Failed to get transaction with id txn, error Entry not found in index', + transactionPayload: undefined, + }); + }); + + it('throws an error if the transaction fails but can be retried', async () => { + mockJob.data = { + mspid: 'mockMspid', + transactionName: 'txn', + transactionArgs: ['arg1', 'arg2'], + transactionState: mockSavedState, + }; + mockTransaction.submit + .calledWith('arg1', 'arg2') + .mockRejectedValue(new Error('MOCK ERROR')); + + await expect(async () => { + await processSubmitTransactionJob(mockContracts, mockJob); + }).rejects.toThrow('MOCK ERROR'); + }); + }); }); diff --git a/asset-transfer-basic/rest-api-typescript/src/jobs.ts b/asset-transfer-basic/rest-api-typescript/src/jobs.ts index da397a2d..2b4f2399 100644 --- a/asset-transfer-basic/rest-api-typescript/src/jobs.ts +++ b/asset-transfer-basic/rest-api-typescript/src/jobs.ts @@ -3,17 +3,13 @@ * * This sample uses BullMQ jobs to process submit transactions, which includes * retry support for failing jobs - * - * Important: BullMQ requires the following setting in redis - * maxmemory-policy=noeviction - * For details, see: https://docs.bullmq.io/guide/connections */ import { ConnectionOptions, Job, Queue, QueueScheduler, Worker } from 'bullmq'; import { Contract, Transaction } from 'fabric-network'; import * as config from './config'; -import { JobNotFoundError } from './errors'; -import { processSubmitTransactionJob } from './fabric'; +import { getRetryAction, RetryAction } from './errors'; +import { submitTransaction } from './fabric'; import { logger } from './logger'; export type JobData = { @@ -37,6 +33,18 @@ export type JobSummary = { transactionError?: string; }; +export class JobNotFoundError extends Error { + jobId: string; + + constructor(message: string, jobId: string) { + super(message); + Object.setPrototypeOf(this, JobNotFoundError.prototype); + + this.name = 'JobNotFoundError'; + this.jobId = jobId; + } +} + const connection: ConnectionOptions = { port: config.redisPort, host: config.redisHost, @@ -214,3 +222,108 @@ export const getJobCounts = async ( return jobCounts; }; + +/* + * Process a submit transaction request from the job queue + * + * The job will be retried if this function throws an error + */ +export const processSubmitTransactionJob = async ( + contracts: Map, + job: Job +): Promise => { + logger.debug({ jobId: job.id, jobName: job.name }, 'Processing job'); + + const contract = contracts.get(job.data.mspid); + if (contract === undefined) { + logger.error( + { jobId: job.id, jobName: job.name }, + 'Contract not found for MSP ID %s', + job.data.mspid + ); + + // Retrying will never work without a contract, so give up with an + // empty job result + return { + transactionError: undefined, + transactionPayload: undefined, + }; + } + + const args = job.data.transactionArgs; + let transaction: Transaction; + + if (job.data.transactionState) { + const savedState = job.data.transactionState; + logger.debug( + { + jobId: job.id, + jobName: job.name, + savedState, + }, + 'Reusing previously saved transaction state' + ); + + transaction = contract.deserializeTransaction(savedState); + } else { + logger.debug( + { + jobId: job.id, + jobName: job.name, + }, + 'Using new transaction' + ); + + transaction = contract.createTransaction(job.data.transactionName); + await updateJobData(job, transaction); + } + + logger.debug( + { + jobId: job.id, + jobName: job.name, + transactionId: transaction.getTransactionId(), + }, + 'Submitting transaction' + ); + + try { + const payload = await submitTransaction(transaction, ...args); + + return { + transactionError: undefined, + transactionPayload: payload, + }; + } catch (err) { + const retryAction = getRetryAction(err); + + if (retryAction === RetryAction.None) { + logger.error( + { jobId: job.id, jobName: job.name, err }, + 'Fatal transaction error occurred' + ); + + // Not retriable so return a job result with the error details + return { + transactionError: `${err}`, + transactionPayload: undefined, + }; + } + + logger.warn( + { jobId: job.id, jobName: job.name, err }, + 'Retryable transaction error occurred' + ); + + if (retryAction === RetryAction.WithNewTransactionId) { + logger.debug( + { jobId: job.id, jobName: job.name }, + 'Clearing saved transaction state' + ); + await updateJobData(job, undefined); + } + + // Rethrow the error to keep retrying + throw err; + } +}; diff --git a/asset-transfer-basic/rest-api-typescript/src/redis.ts b/asset-transfer-basic/rest-api-typescript/src/redis.ts index bb4246da..35865545 100644 --- a/asset-transfer-basic/rest-api-typescript/src/redis.ts +++ b/asset-transfer-basic/rest-api-typescript/src/redis.ts @@ -1,7 +1,7 @@ /* * SPDX-License-Identifier: Apache-2.0 * - * TBC + * This sample uses the BullMQ queue system, which is built on top of Redis */ import IORedis, { Redis, RedisOptions } from 'ioredis';