Clarify retry logic

Improve the separation between Fabric logic and the job queue implementation details

Signed-off-by: James Taylor <jamest@uk.ibm.com>
This commit is contained in:
James Taylor 2021-12-08 16:58:27 +00:00
parent b0256a57b5
commit 5d92abc52d
12 changed files with 486 additions and 323 deletions

View file

@ -51,7 +51,7 @@ Create a `.env` file to configure the server for the test network (make sure TES
TEST_NETWORK_HOME=$HOME/fabric-samples/test-network npm run generateEnv
```
Start a Redis server
Start a Redis server (Redis is used to store the queue of submit transactions)
```shell
npm run start:redis

View file

@ -289,9 +289,9 @@ describe('Config values', () => {
});
describe('commitTimeout', () => {
it('defaults to "3000"', () => {
it('defaults to "300"', () => {
const config = require('./config');
expect(config.commitTimeout).toBe(3000);
expect(config.commitTimeout).toBe(300);
});
it('can be configured using the "HLF_COMMIT_TIMEOUT" environment variable', () => {
@ -305,7 +305,7 @@ describe('Config values', () => {
expect(() => {
require('./config');
}).toThrow(
'env-var: "HLF_COMMIT_TIMEOUT" should be a valid integer. An example of a valid value would be: 3000'
'env-var: "HLF_COMMIT_TIMEOUT" should be a valid integer. An example of a valid value would be: 300'
);
});
});

View file

@ -3,12 +3,12 @@
*
* The sample REST server can be configured using the environment variables
* documented below
*
*
* In a local development environment, these variables can be loaded from a
* .env file by starting the server with the following command:
*
*
* npm start:dev
*
*
* The scripts/generateEnv.sh script can be used to generate a suitable .env
* file for the Fabric Test Network
*/

View file

@ -2,15 +2,20 @@
* SPDX-License-Identifier: Apache-2.0
*/
import { TimeoutError, TransactionError } from 'fabric-network';
import {
AssetExistsError,
AssetNotFoundError,
TransactionNotFoundError,
getRetryAction,
handleError,
isDuplicateTransactionError,
isErrorLike,
RetryAction,
} from './errors';
import { mock } from 'jest-mock-extended';
describe('Errors', () => {
describe('isErrorLike', () => {
it('returns false for null', () => {
@ -53,6 +58,24 @@ describe('Errors', () => {
});
describe('isDuplicateTransactionError', () => {
it('returns true for a TransactionError with a transaction code of DUPLICATE_TXID', () => {
const mockDuplicateTransactionError = mock<TransactionError>();
mockDuplicateTransactionError.transactionCode = 'DUPLICATE_TXID';
expect(isDuplicateTransactionError(mockDuplicateTransactionError)).toBe(
true
);
});
it('returns false for a TransactionError without a transaction code of MVCC_READ_CONFLICT', () => {
const mockDuplicateTransactionError = mock<TransactionError>();
mockDuplicateTransactionError.transactionCode = 'MVCC_READ_CONFLICT';
expect(isDuplicateTransactionError(mockDuplicateTransactionError)).toBe(
false
);
});
it('returns true for an error when all endorsement details are duplicate transaction found', () => {
const mockDuplicateTransactionError = {
errors: [
@ -155,13 +178,96 @@ describe('Errors', () => {
});
});
describe('getRetryAction', () => {
it('returns RetryAction.None for duplicate transaction errors', () => {
const mockDuplicateTransactionError = {
errors: [
{
endorsements: [
{
details: 'duplicate transaction found',
},
{
details: 'duplicate transaction found',
},
{
details: 'duplicate transaction found',
},
],
},
],
};
expect(getRetryAction(mockDuplicateTransactionError)).toBe(
RetryAction.None
);
});
it('returns RetryAction.None for a TransactionNotFoundError', () => {
const mockTransactionNotFoundError = new TransactionNotFoundError('Failed to get transaction with id txn, error Entry not found in index', 'txn1');
expect(getRetryAction(mockTransactionNotFoundError)).toBe(
RetryAction.None
);
});
it('returns RetryAction.None for an AssetExistsError', () => {
const mockAssetExistsError = new AssetExistsError('The asset MOCK_ASSET already exists', 'txn1');
expect(getRetryAction(mockAssetExistsError)).toBe(
RetryAction.None
);
});
it('returns RetryAction.None for an AssetNotFoundError', () => {
const mockAssetNotFoundError = new AssetNotFoundError('the asset MOCK_ASSET does not exist', 'txn1');
expect(getRetryAction(mockAssetNotFoundError)).toBe(
RetryAction.None
);
});
it('returns RetryAction.WithExistingTransactionId for a TimeoutError', () => {
const mockTimeoutError = new TimeoutError('MOCK TIMEOUT ERROR');
expect(getRetryAction(mockTimeoutError)).toBe(
RetryAction.WithExistingTransactionId
);
});
it('returns RetryAction.WithNewTransactionId for an MVCC_READ_CONFLICT TransactionError', () => {
const mockTransactionError = mock<TransactionError>();
mockTransactionError.transactionCode = 'MVCC_READ_CONFLICT';
expect(getRetryAction(mockTransactionError)).toBe(
RetryAction.WithNewTransactionId
);
});
it('returns RetryAction.WithNewTransactionId for an Error', () => {
const mockError = new Error('MOCK ERROR');
expect(getRetryAction(mockError)).toBe(
RetryAction.WithNewTransactionId
);
});
it('returns RetryAction.WithNewTransactionId for a string error', () => {
const mockError = 'MOCK ERROR';
expect(getRetryAction(mockError)).toBe(
RetryAction.WithNewTransactionId
);
});
});
describe('handleError', () => {
it.each([
'the asset GOCHAINCODE already exists',
'Asset JAVACHAINCODE already exists',
'The asset JSCHAINCODE already exists',
])(
'returns an AssetExistsError for errors with an asset already exists message: %s',
'returns a AssetExistsError for errors with an asset already exists message: %s',
(msg) => {
expect(handleError('txn1', new Error(msg))).toStrictEqual(
new AssetExistsError(msg, 'txn1')
@ -174,7 +280,7 @@ describe('Errors', () => {
'Asset JAVACHAINCODE does not exist',
'The asset JSCHAINCODE does not exist',
])(
'returns an AssetNotFoundError for errors with an asset does not exist message: %s',
'returns a AssetNotFoundError for errors with an asset does not exist message: %s',
(msg) => {
expect(handleError('txn1', new Error(msg))).toStrictEqual(
new AssetNotFoundError(msg, 'txn1')
@ -200,10 +306,8 @@ describe('Errors', () => {
);
});
it('returns a new Error object for errors of other types', () => {
expect(handleError('txn1', 42)).toStrictEqual(
new Error('Unhandled error: 42')
);
it('returns the original error for errors of other types', () => {
expect(handleError('txn1', 42)).toEqual(42);
});
});
});

View file

@ -1,9 +1,18 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* This file contains all the error handling for Fabric transactions, including
* whether a transaction should be retried.
*/
import { TimeoutError, TransactionError } from 'fabric-network';
import { logger } from './logger';
/*
* Base type for errors from the smart contract.
*
* These errors will not be retried.
*/
export class ContractError extends Error {
transactionId: string;
@ -16,18 +25,23 @@ export class ContractError extends Error {
}
}
/*
* Represents the error which occurs when the transaction being submitted or
* evaluated is not implemented in a smart contract.
*/
export class TransactionNotFoundError extends ContractError {
transactionId: string;
constructor(message: string, transactionId: string) {
super(message, transactionId);
Object.setPrototypeOf(this, TransactionNotFoundError.prototype);
this.name = 'TransactionNotFoundError';
this.transactionId = transactionId;
}
}
/*
* Represents the error which occurs in the basic asset transfer smart contract
* implementation when an asset already exists.
*/
export class AssetExistsError extends ContractError {
constructor(message: string, transactionId: string) {
super(message, transactionId);
@ -37,6 +51,10 @@ export class AssetExistsError extends ContractError {
}
}
/*
* Represents the error which occurs in the basic asset transfer smart contract
* implementation when an asset does not exist.
*/
export class AssetNotFoundError extends ContractError {
constructor(message: string, transactionId: string) {
super(message, transactionId);
@ -46,18 +64,53 @@ export class AssetNotFoundError extends ContractError {
}
}
export class JobNotFoundError extends Error {
jobId: string;
constructor(message: string, jobId: string) {
super(message);
Object.setPrototypeOf(this, JobNotFoundError.prototype);
this.name = 'JobNotFoundError';
this.jobId = jobId;
}
/*
* Enumeration of possible retry actions.
*
* WithExistingTransactionId - transactions should be retried using the same
* transaction ID to protect against duplicate transactions being committed if
* a timeout error occurs
*
* WithNewTransactionId - transactions which could not be committed due to
* other errors require a new transaction ID when retrying
*
* None - transactions that failed due to a duplicate transaction error, or
* errors from the smart contract, should not be retried
*/
export enum RetryAction {
WithExistingTransactionId,
WithNewTransactionId,
None,
}
/*
* Get the required transaction retry action for an error.
*
* For this sample transactions are considered retriable if they fail with any
* error, *except* for duplicate transaction errors, or errors from the smart
* contract.
*
* You might decide to retry transactions which fail with specific errors
* instead, for example:
* MVCC_READ_CONFLICT
* PHANTOM_READ_CONFLICT
* ENDORSEMENT_POLICY_FAILURE
* CHAINCODE_VERSION_CONFLICT
* EXPIRED_CHAINCODE
*/
export const getRetryAction = (err: unknown): RetryAction => {
if (isDuplicateTransactionError(err) || err instanceof ContractError) {
return RetryAction.None;
} else if (err instanceof TimeoutError) {
return RetryAction.WithExistingTransactionId;
}
return RetryAction.WithNewTransactionId;
};
/*
* Type guard to make catching unknown errors easier
*/
export const isErrorLike = (err: unknown): err is Error => {
return (
err != undefined &&
@ -72,23 +125,32 @@ export const isErrorLike = (err: unknown): err is Error => {
/*
* Checks whether an error was caused by a duplicate transaction.
*
* Checking error strings like this is not ideal, unfortunately it appears to
* be the only option. In this case it would be better to check for the
* DUPLICATE_TXID TxValidationCode somehow but that does not seem to be
* possible.
* This is ...painful.
*/
export const isDuplicateTransactionError = (err: unknown): boolean => {
logger.debug({ err }, 'Checking for duplicate transaction error');
if (err === undefined || err === null) return false;
const endorsementError = err as {
errors: { endorsements: { details: string }[] }[];
};
let isDuplicate;
if (typeof (err as TransactionError).transactionCode === 'string') {
// Checking whether a commit failure is caused by a duplicate transaction
// is straightforward because the transaction code should be available
isDuplicate =
(err as TransactionError).transactionCode === 'DUPLICATE_TXID';
} else {
// Checking whether an endorsement failure is caused by a duplicate
// transaction is only possible by processing error strings, which is not ideal.
const endorsementError = err as {
errors: { endorsements: { details: string }[] }[];
};
const isDuplicate = endorsementError?.errors?.some((err) =>
err?.endorsements?.some((endorsement) =>
endorsement?.details?.startsWith('duplicate transaction found')
)
);
isDuplicate = endorsementError?.errors?.some((err) =>
err?.endorsements?.some((endorsement) =>
endorsement?.details?.startsWith('duplicate transaction found')
)
);
}
return isDuplicate === true;
};
@ -167,27 +229,18 @@ const matchTransactionDoesNotExistMessage = (
return null;
};
export const isContractError = (err: unknown): boolean => {
if (
err instanceof AssetExistsError ||
err instanceof AssetNotFoundError ||
err instanceof TransactionNotFoundError
) {
return true;
}
return false;
};
/*
* Handles errors from evaluating and submitting transactions.
*
* As with duplicate transaction errors, checking error strings like this is
* not ideal. Unfortunately the chaincode samples do not use error codes so
* again it's the only option. The error message text is not even the same for
* the Go, Java, and Javascript implementations of the chaincode!
* Smart contract errors from the the basic asset transfer samples do not use
* error codes so matching strings is the only option, which is not ideal.
* Note: the error message text is not the same for the Go, Java, and
* Javascript implementations of the chaincode!
*/
export const handleError = (transactionId: string, err: unknown): Error => {
export const handleError = (
transactionId: string,
err: unknown
): Error | unknown => {
logger.debug({ transactionId: transactionId, err }, 'Processing error');
if (isErrorLike(err)) {
@ -210,9 +263,7 @@ export const handleError = (transactionId: string, err: unknown): Error => {
transactionId
);
}
return err;
}
return new Error(`Unhandled error: ${err}`);
return err;
};

View file

@ -11,7 +11,6 @@ import {
submitTransaction,
getBlockHeight,
getTransactionValidationCode,
processSubmitTransactionJob,
} from './fabric';
import * as config from './config';
@ -34,7 +33,6 @@ import * as fabricProtos from 'fabric-protos';
import { MockProxy, mock } from 'jest-mock-extended';
import Long from 'long';
import { Job } from 'bullmq';
jest.mock('./config');
jest.mock('fabric-network', () => {
@ -117,133 +115,6 @@ describe('Fabric', () => {
});
});
describe('processSubmitTransactionJob', () => {
const mockContracts = new Map<string, Contract>();
const mockPayload = Buffer.from('MOCK PAYLOAD');
const mockSavedState = Buffer.from('MOCK SAVED STATE');
let mockTransaction: MockProxy<Transaction>;
let mockContract: MockProxy<Contract>;
let mockJob: MockProxy<Job>;
beforeEach(() => {
mockTransaction = mock<Transaction>();
mockTransaction.getTransactionId.mockReturnValue('mockTransactionId');
mockContract = mock<Contract>();
mockContract.createTransaction
.calledWith('txn')
.mockReturnValue(mockTransaction);
mockContract.deserializeTransaction
.calledWith(mockSavedState)
.mockReturnValue(mockTransaction);
mockContracts.set('mockMspid', mockContract);
mockJob = mock<Job>();
});
it('gets job result with no error or payload if no contract is available for the required mspid', async () => {
mockJob.data = {
mspid: 'missingMspid',
};
const jobResult = await processSubmitTransactionJob(
mockContracts,
mockJob
);
expect(jobResult).toStrictEqual({
transactionError: undefined,
transactionPayload: undefined,
});
});
it('gets a job result containing a payload if the transaction was successful first time', async () => {
mockJob.data = {
mspid: 'mockMspid',
transactionName: 'txn',
transactionArgs: ['arg1', 'arg2'],
};
mockTransaction.submit
.calledWith('arg1', 'arg2')
.mockResolvedValue(mockPayload);
const jobResult = await processSubmitTransactionJob(
mockContracts,
mockJob
);
expect(jobResult).toStrictEqual({
transactionError: undefined,
transactionPayload: Buffer.from('MOCK PAYLOAD'),
});
});
it('gets a job result containing a payload if the transaction was successfully rerun using saved transaction state', async () => {
mockJob.data = {
mspid: 'mockMspid',
transactionName: 'txn',
transactionArgs: ['arg1', 'arg2'],
transactionState: mockSavedState,
};
mockTransaction.submit
.calledWith('arg1', 'arg2')
.mockResolvedValue(mockPayload);
const jobResult = await processSubmitTransactionJob(
mockContracts,
mockJob
);
expect(jobResult).toStrictEqual({
transactionError: undefined,
transactionPayload: Buffer.from('MOCK PAYLOAD'),
});
});
it('gets a job result containing an error message if the transaction fails but cannot be retried', async () => {
mockJob.data = {
mspid: 'mockMspid',
transactionName: 'txn',
transactionArgs: ['arg1', 'arg2'],
transactionState: mockSavedState,
};
mockTransaction.submit
.calledWith('arg1', 'arg2')
.mockRejectedValue(
new Error(
'Failed to get transaction with id txn, error Entry not found in index'
)
);
const jobResult = await processSubmitTransactionJob(
mockContracts,
mockJob
);
expect(jobResult).toStrictEqual({
transactionError:
'TransactionNotFoundError: Failed to get transaction with id txn, error Entry not found in index',
transactionPayload: undefined,
});
});
it('throws an error if the transaction fails but can be retried', async () => {
mockJob.data = {
mspid: 'mockMspid',
transactionName: 'txn',
transactionArgs: ['arg1', 'arg2'],
transactionState: mockSavedState,
};
mockTransaction.submit
.calledWith('arg1', 'arg2')
.mockRejectedValue(new Error('MOCK ERROR'));
await expect(async () => {
await processSubmitTransactionJob(mockContracts, mockJob);
}).rejects.toThrow('MOCK ERROR');
});
});
describe('evatuateTransaction', () => {
const mockPayload = Buffer.from('MOCK PAYLOAD');
let mockTransaction: MockProxy<Transaction>;

View file

@ -10,20 +10,13 @@ import {
GatewayOptions,
Wallets,
Network,
TimeoutError,
Transaction,
Wallet,
} from 'fabric-network';
import * as config from './config';
import { logger } from './logger';
import {
handleError,
isContractError,
isDuplicateTransactionError,
} from './errors';
import { handleError } from './errors';
import * as protos from 'fabric-protos';
import { Job } from 'bullmq';
import { JobData, JobResult, updateJobData } from './jobs';
/*
* Creates an in memory wallet to hold credentials for an Org1 and Org2 user
@ -120,122 +113,6 @@ export const getContracts = async (
return { assetContract, qsccContract };
};
/*
* Process a submit transaction request from the job queue
*
* For this sample transactions are retried if they fail with any error,
* except for errors from the smart contract, or duplicate transaction
* errors
*
* You might decide to retry transactions which fail with specific errors
* instead, for example:
* MVCC_READ_CONFLICT
* PHANTOM_READ_CONFLICT
* ENDORSEMENT_POLICY_FAILURE
* CHAINCODE_VERSION_CONFLICT
* EXPIRED_CHAINCODE
*/
export const processSubmitTransactionJob = async (
contracts: Map<string, Contract>,
job: Job<JobData, JobResult>
): Promise<JobResult> => {
logger.debug({ jobId: job.id, jobName: job.name }, 'Processing job');
const contract = contracts.get(job.data.mspid);
if (contract === undefined) {
logger.error(
{ jobId: job.id, jobName: job.name },
'Contract not found for MSP ID %s',
job.data.mspid
);
// Retrying will not work, so give up with an unsuccessful result
return {
transactionError: undefined,
transactionPayload: undefined,
};
}
let transaction: Transaction;
if (job.data.transactionState) {
const savedState = job.data.transactionState;
logger.debug(
{
jobId: job.id,
jobName: job.name,
savedState,
},
'Using previously saved transaction state'
);
transaction = contract.deserializeTransaction(savedState);
} else {
logger.debug(
{
jobId: job.id,
jobName: job.name,
},
'Using new transaction'
);
transaction = contract.createTransaction(job.data.transactionName);
await updateJobData(job, transaction);
}
try {
logger.debug(
{
jobId: job.id,
jobName: job.name,
transactionId: transaction.getTransactionId(),
},
'Submitting transaction'
);
const args = job.data.transactionArgs;
const payload = await submitTransaction(transaction, ...args);
return {
transactionError: undefined,
transactionPayload: payload,
};
} catch (err) {
if (
err instanceof Error &&
(isContractError(err) || isDuplicateTransactionError(err))
) {
logger.error(
{ jobId: job.id, jobName: job.name, err },
'Fatal transaction error occurred'
);
// Return a job result to stop retrying
return {
transactionError: err.toString(),
transactionPayload: undefined,
};
} else {
logger.warn(
{ jobId: job.id, jobName: job.name, err },
'Retryable transaction error occurred'
);
// The original transaction may eventually get committed in the case of
// a timeout error, so keep the same transaction ID to protect against
// unintended duplicate transactions
if (!(err instanceof TimeoutError)) {
logger.debug(
{ jobId: job.id, jobName: job.name },
'Clearing saved transaction state'
);
await updateJobData(job, undefined);
}
// Rethrow the error to keep retrying
throw err;
}
}
};
/*
* Evaluate a transaction and handle any errors
*/

View file

@ -4,11 +4,32 @@
* This is the main entrypoint for the sample REST server, which is responsible
* for connecting to the Fabric network and setting up a job queue for
* processing submit transactions
*
* You can find more details related to the Fabric aspects of the sample in the
* following files:
*
* - errors.ts
* Fabric transaction error handling and retry logic
* - fabric.ts
* all the sample code which interacts with the Fabric SDK
*
* You can find details of other aspects of the sample in the following files:
* The remaining files are related to the REST server aspects of the sample,
* rather than Fabric itself:
*
* - *.router.ts
* details of the REST endpoints provided by the sample
* - auth.ts
* basic API key authentication strategy used for the sample
* - config.ts
* descriptions of all the available configuration environment variables
* - jobs.ts
* job queue implementation details
* - logger.ts
* logging implementation details
* - redis.ts
* redis implementation details
* - server.ts
* express server implementation details
*/
import { Contract } from 'fabric-network';

View file

@ -5,8 +5,7 @@
import { Queue } from 'bullmq';
import express, { Request, Response } from 'express';
import { getReasonPhrase, StatusCodes } from 'http-status-codes';
import { JobNotFoundError } from './errors';
import { getJobSummary } from './jobs';
import { getJobSummary, JobNotFoundError } from './jobs';
import { logger } from './logger';
const { INTERNAL_SERVER_ERROR, NOT_FOUND, OK } = StatusCodes;

View file

@ -3,9 +3,9 @@
*/
import { Job, Queue } from 'bullmq';
import { getJobCounts, getJobSummary } from './jobs';
import { getJobCounts, getJobSummary, processSubmitTransactionJob, JobNotFoundError } from './jobs';
import { Contract, Transaction } from 'fabric-network';
import { mock, MockProxy } from 'jest-mock-extended';
import { JobNotFoundError } from './errors';
describe('initJobQueue', () => {
it.todo('write tests');
@ -152,4 +152,131 @@ describe('getJobCounts', () => {
waiting: 5,
});
});
describe('processSubmitTransactionJob', () => {
const mockContracts = new Map<string, Contract>();
const mockPayload = Buffer.from('MOCK PAYLOAD');
const mockSavedState = Buffer.from('MOCK SAVED STATE');
let mockTransaction: MockProxy<Transaction>;
let mockContract: MockProxy<Contract>;
let mockJob: MockProxy<Job>;
beforeEach(() => {
mockTransaction = mock<Transaction>();
mockTransaction.getTransactionId.mockReturnValue('mockTransactionId');
mockContract = mock<Contract>();
mockContract.createTransaction
.calledWith('txn')
.mockReturnValue(mockTransaction);
mockContract.deserializeTransaction
.calledWith(mockSavedState)
.mockReturnValue(mockTransaction);
mockContracts.set('mockMspid', mockContract);
mockJob = mock<Job>();
});
it('gets job result with no error or payload if no contract is available for the required mspid', async () => {
mockJob.data = {
mspid: 'missingMspid',
};
const jobResult = await processSubmitTransactionJob(
mockContracts,
mockJob
);
expect(jobResult).toStrictEqual({
transactionError: undefined,
transactionPayload: undefined,
});
});
it('gets a job result containing a payload if the transaction was successful first time', async () => {
mockJob.data = {
mspid: 'mockMspid',
transactionName: 'txn',
transactionArgs: ['arg1', 'arg2'],
};
mockTransaction.submit
.calledWith('arg1', 'arg2')
.mockResolvedValue(mockPayload);
const jobResult = await processSubmitTransactionJob(
mockContracts,
mockJob
);
expect(jobResult).toStrictEqual({
transactionError: undefined,
transactionPayload: Buffer.from('MOCK PAYLOAD'),
});
});
it('gets a job result containing a payload if the transaction was successfully rerun using saved transaction state', async () => {
mockJob.data = {
mspid: 'mockMspid',
transactionName: 'txn',
transactionArgs: ['arg1', 'arg2'],
transactionState: mockSavedState,
};
mockTransaction.submit
.calledWith('arg1', 'arg2')
.mockResolvedValue(mockPayload);
const jobResult = await processSubmitTransactionJob(
mockContracts,
mockJob
);
expect(jobResult).toStrictEqual({
transactionError: undefined,
transactionPayload: Buffer.from('MOCK PAYLOAD'),
});
});
it('gets a job result containing an error message if the transaction fails but cannot be retried', async () => {
mockJob.data = {
mspid: 'mockMspid',
transactionName: 'txn',
transactionArgs: ['arg1', 'arg2'],
transactionState: mockSavedState,
};
mockTransaction.submit
.calledWith('arg1', 'arg2')
.mockRejectedValue(
new Error(
'Failed to get transaction with id txn, error Entry not found in index'
)
);
const jobResult = await processSubmitTransactionJob(
mockContracts,
mockJob
);
expect(jobResult).toStrictEqual({
transactionError:
'TransactionNotFoundError: Failed to get transaction with id txn, error Entry not found in index',
transactionPayload: undefined,
});
});
it('throws an error if the transaction fails but can be retried', async () => {
mockJob.data = {
mspid: 'mockMspid',
transactionName: 'txn',
transactionArgs: ['arg1', 'arg2'],
transactionState: mockSavedState,
};
mockTransaction.submit
.calledWith('arg1', 'arg2')
.mockRejectedValue(new Error('MOCK ERROR'));
await expect(async () => {
await processSubmitTransactionJob(mockContracts, mockJob);
}).rejects.toThrow('MOCK ERROR');
});
});
});

View file

@ -3,17 +3,13 @@
*
* This sample uses BullMQ jobs to process submit transactions, which includes
* retry support for failing jobs
*
* Important: BullMQ requires the following setting in redis
* maxmemory-policy=noeviction
* For details, see: https://docs.bullmq.io/guide/connections
*/
import { ConnectionOptions, Job, Queue, QueueScheduler, Worker } from 'bullmq';
import { Contract, Transaction } from 'fabric-network';
import * as config from './config';
import { JobNotFoundError } from './errors';
import { processSubmitTransactionJob } from './fabric';
import { getRetryAction, RetryAction } from './errors';
import { submitTransaction } from './fabric';
import { logger } from './logger';
export type JobData = {
@ -37,6 +33,18 @@ export type JobSummary = {
transactionError?: string;
};
export class JobNotFoundError extends Error {
jobId: string;
constructor(message: string, jobId: string) {
super(message);
Object.setPrototypeOf(this, JobNotFoundError.prototype);
this.name = 'JobNotFoundError';
this.jobId = jobId;
}
}
const connection: ConnectionOptions = {
port: config.redisPort,
host: config.redisHost,
@ -214,3 +222,108 @@ export const getJobCounts = async (
return jobCounts;
};
/*
* Process a submit transaction request from the job queue
*
* The job will be retried if this function throws an error
*/
export const processSubmitTransactionJob = async (
contracts: Map<string, Contract>,
job: Job<JobData, JobResult>
): Promise<JobResult> => {
logger.debug({ jobId: job.id, jobName: job.name }, 'Processing job');
const contract = contracts.get(job.data.mspid);
if (contract === undefined) {
logger.error(
{ jobId: job.id, jobName: job.name },
'Contract not found for MSP ID %s',
job.data.mspid
);
// Retrying will never work without a contract, so give up with an
// empty job result
return {
transactionError: undefined,
transactionPayload: undefined,
};
}
const args = job.data.transactionArgs;
let transaction: Transaction;
if (job.data.transactionState) {
const savedState = job.data.transactionState;
logger.debug(
{
jobId: job.id,
jobName: job.name,
savedState,
},
'Reusing previously saved transaction state'
);
transaction = contract.deserializeTransaction(savedState);
} else {
logger.debug(
{
jobId: job.id,
jobName: job.name,
},
'Using new transaction'
);
transaction = contract.createTransaction(job.data.transactionName);
await updateJobData(job, transaction);
}
logger.debug(
{
jobId: job.id,
jobName: job.name,
transactionId: transaction.getTransactionId(),
},
'Submitting transaction'
);
try {
const payload = await submitTransaction(transaction, ...args);
return {
transactionError: undefined,
transactionPayload: payload,
};
} catch (err) {
const retryAction = getRetryAction(err);
if (retryAction === RetryAction.None) {
logger.error(
{ jobId: job.id, jobName: job.name, err },
'Fatal transaction error occurred'
);
// Not retriable so return a job result with the error details
return {
transactionError: `${err}`,
transactionPayload: undefined,
};
}
logger.warn(
{ jobId: job.id, jobName: job.name, err },
'Retryable transaction error occurred'
);
if (retryAction === RetryAction.WithNewTransactionId) {
logger.debug(
{ jobId: job.id, jobName: job.name },
'Clearing saved transaction state'
);
await updateJobData(job, undefined);
}
// Rethrow the error to keep retrying
throw err;
}
};

View file

@ -1,7 +1,7 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* TBC
* This sample uses the BullMQ queue system, which is built on top of Redis
*/
import IORedis, { Redis, RedisOptions } from 'ioredis';