From ad3fd7e832f9dd349ec2fd2bcdfc29e74a6296e1 Mon Sep 17 00:00:00 2001 From: James Taylor Date: Wed, 22 Sep 2021 19:27:38 +0100 Subject: [PATCH] Update retry logic Previously transactions were only retried after being successfully endorsed, and always with the same transaction ID Transactions will now be added to a queue for processing and will also be retried if endorsement fails (with a different transaction id for invalid transactions) Signed-off-by: James Taylor --- .../rest-api-typescript/README.md | 57 ++- .../rest-api-typescript/demo.http | 5 + .../rest-api-typescript/package-lock.json | 130 ++++- .../rest-api-typescript/package.json | 5 +- .../src/__mocks__/fabric-network.ts | 178 ------- .../src/__tests__/api.test.ts | 298 +++++++---- .../rest-api-typescript/src/assets.router.ts | 131 ++--- .../rest-api-typescript/src/config.spec.ts | 168 +++++-- .../rest-api-typescript/src/config.ts | 70 ++- .../rest-api-typescript/src/errors.spec.ts | 143 +++++- .../rest-api-typescript/src/errors.ts | 213 +++++--- .../rest-api-typescript/src/fabric.spec.ts | 472 +++++++----------- .../rest-api-typescript/src/fabric.ts | 342 ++++++------- .../rest-api-typescript/src/health.router.ts | 23 +- .../rest-api-typescript/src/index.ts | 82 ++- .../rest-api-typescript/src/jobs.router.ts | 41 ++ .../rest-api-typescript/src/jobs.spec.ts | 155 ++++++ .../rest-api-typescript/src/jobs.ts | 216 ++++++++ .../rest-api-typescript/src/redis.spec.ts | 188 +------ .../rest-api-typescript/src/redis.ts | 208 ++------ .../rest-api-typescript/src/server.ts | 68 +-- .../src/transactions.router.ts | 85 +--- 22 files changed, 1826 insertions(+), 1452 deletions(-) delete mode 100644 asset-transfer-basic/rest-api-typescript/src/__mocks__/fabric-network.ts create mode 100644 asset-transfer-basic/rest-api-typescript/src/jobs.router.ts create mode 100644 asset-transfer-basic/rest-api-typescript/src/jobs.spec.ts create mode 100644 asset-transfer-basic/rest-api-typescript/src/jobs.ts diff --git a/asset-transfer-basic/rest-api-typescript/README.md b/asset-transfer-basic/rest-api-typescript/README.md index ea61b775..b5191620 100644 --- a/asset-transfer-basic/rest-api-typescript/README.md +++ b/asset-transfer-basic/rest-api-typescript/README.md @@ -8,6 +8,21 @@ The REST API is intended to work with the [basic asset transfer example](https:/ To install the basic asset transfer chaincode on a local Fabric network, follow the [Using the Fabric test network](https://hyperledger-fabric.readthedocs.io/en/release-2.2/test_network.html) tutorial +## Overview + +The sample creates two long lived connections to a Fabric network in order to submit and evaluate transactions using two different identities + +To ensure requests respond quickly enough to avoid timeouts, all submit transactions are queued for processing and will be retried if they fail + +Submit transactions are retried if they fail with any error, except for errors from the smart contract, or duplicate transaction errors + +Alternatively you might prefer to modify the sample to only retry transactions which fail with specific errors instead, for example: +- MVCC_READ_CONFLICT +- PHANTOM_READ_CONFLICT +- ENDORSEMENT_POLICY_FAILURE +- CHAINCODE_VERSION_CONFLICT +- EXPIRED_CHAINCODE + ## Usage **Note:** these instructions should work with the release-2.2 branch of `fabric-samples` but later versions require some changes @@ -70,7 +85,7 @@ docker-compose up -d ## REST API -If everything went well, you can now make basic asset transfer REST calls! +If everything went well, you can now open a new terminal and try out some basic asset transfer REST calls! The examples below require a `SAMPLE_APIKEY` environment variable which must be set to an API key from the `.env` file created above. @@ -86,6 +101,12 @@ SAMPLE_APIKEY=$(grep ORG1_APIKEY .env | cut -d '=' -f 2-) curl --header "X-Api-Key: ${SAMPLE_APIKEY}" http://localhost:3000/api/assets ``` +You should see all the available assets, for example + +``` +[{"AppraisedValue":300,"Color":"blue","ID":"asset1","Owner":"Tomoko","Size":5},{"AppraisedValue":400,"Color":"red","ID":"asset2","Owner":"Brad","Size":5},{"AppraisedValue":500,"Color":"green","ID":"asset3","Owner":"Jin Soo","Size":10},{"AppraisedValue":600,"Color":"yellow","ID":"asset4","Owner":"Max","Size":10},{"AppraisedValue":700,"Color":"black","ID":"asset5","Owner":"Adriana","Size":15},{"AppraisedValue":800,"Color":"white","ID":"asset6","Owner":"Michel","Size":15}] +``` + ### Check whether an asset exists... ```shell @@ -98,18 +119,52 @@ curl --include --header "X-Api-Key: ${SAMPLE_APIKEY}" --request OPTIONS http://l curl --include --header "Content-Type: application/json" --header "X-Api-Key: ${SAMPLE_APIKEY}" --request POST --data '{"id":"asset7","color":"red","size":42,"owner":"Jean","appraisedValue":101}' http://localhost:3000/api/assets ``` +The response should include a `jobId` which you can use to check the job status in next step + +``` +{"status":"Accepted","jobId":"1","timestamp":"2021-10-22T16:27:09.426Z"} +``` + +### Read job status... + +```shell +curl --header "X-Api-Key: ${SAMPLE_APIKEY}" http://localhost:3000/api/jobs/__job_id__ +``` + +The response should include a list of `transactionIds` which you can use to check the transaction status in next step, for example + +``` +{"jobId":"1","transactionIds":["1dd35c2e5d840fec1dccc6e8cfce886c660c103de3e7b93dd774d04f39eef82a"],"transactionPayload":""} +``` + +There may be more transaction IDs if the job was retried + ### Read transaction status... ```shell curl --header "X-Api-Key: ${SAMPLE_APIKEY}" http://localhost:3000/api/transactions/__transaction_id__ ``` +The response will show the validation code of the transaction, for example + +``` +{"transactionId":"1dd35c2e5d840fec1dccc6e8cfce886c660c103de3e7b93dd774d04f39eef82a","validationCode":"VALID"} +``` + +Alternatively, you will get a 404 not found response if the transaction was not committed + ### Read an asset... ```shell curl --header "X-Api-Key: ${SAMPLE_APIKEY}" http://localhost:3000/api/assets/asset7 ``` +You should see the newly created asset, for example + +``` +{"AppraisedValue":101,"Color":"red","ID":"asset7","Owner":"Jean","Size":42} +``` + ### Update an asset... ```shell diff --git a/asset-transfer-basic/rest-api-typescript/demo.http b/asset-transfer-basic/rest-api-typescript/demo.http index 163c7632..13ebfe99 100644 --- a/asset-transfer-basic/rest-api-typescript/demo.http +++ b/asset-transfer-basic/rest-api-typescript/demo.http @@ -40,6 +40,11 @@ X-Api-Key: {{api-key}} "appraisedValue": 101 } +### Read job status + +GET {{apiUrl}}/jobs/__job_id__ HTTP/1.1 +X-Api-Key: {{api-key}} + ### Read transaction status GET {{apiUrl}}/transactions/__transaction_id__ HTTP/1.1 diff --git a/asset-transfer-basic/rest-api-typescript/package-lock.json b/asset-transfer-basic/rest-api-typescript/package-lock.json index e7e35369..c56206e9 100644 --- a/asset-transfer-basic/rest-api-typescript/package-lock.json +++ b/asset-transfer-basic/rest-api-typescript/package-lock.json @@ -1955,6 +1955,67 @@ "resolved": "https://registry.npmjs.org/buffer-from/-/buffer-from-1.1.1.tgz", "integrity": "sha512-MQcXEUbCKtEo7bhqEs6560Hyd4XaovZlO/k9V3hjVUF/zwW7KBVdSK4gIt/bzwS9MbR5qob+F5jusZsb0YQK2A==" }, + "bullmq": { + "version": "1.47.2", + "resolved": "https://registry.npmjs.org/bullmq/-/bullmq-1.47.2.tgz", + "integrity": "sha512-IMzWjXdw6B5RSqPyEiOvoA0efjfTFx2DuB1N+z3T2wYcOVLIcIFybbFjhqVn9Sv/Zb5l6TpuFiU52P+C+/DpNA==", + "requires": { + "@types/ioredis": "^4.27.0", + "cron-parser": "^2.7.3", + "get-port": "^5.0.0", + "ioredis": "^4.27.8", + "lodash": "^4.17.21", + "semver": "^6.3.0", + "tslib": "^1.10.0", + "uuid": "^8.3.2" + }, + "dependencies": { + "@types/ioredis": { + "version": "4.27.4", + "resolved": "https://registry.npmjs.org/@types/ioredis/-/ioredis-4.27.4.tgz", + "integrity": "sha512-uTAA/woL//GxXQI1e9FuUoDZCpP8yn5LXQdea1IEFyLtb8GP2w3HfOE+SqglF6QSAp/3cZLWzrMhHqWSYI3bfg==", + "requires": { + "@types/node": "*" + } + }, + "debug": { + "version": "4.3.2", + "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.2.tgz", + "integrity": "sha512-mOp8wKcvj7XxC78zLgw/ZA+6TSgkoE2C/ienthhRD298T7UNwAg9diBpLRxC0mOezLl4B0xV7M0cCO6P/O0Xhw==", + "requires": { + "ms": "2.1.2" + } + }, + "ioredis": { + "version": "4.27.9", + "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-4.27.9.tgz", + "integrity": "sha512-hAwrx9F+OQ0uIvaJefuS3UTqW+ByOLyLIV+j0EH8ClNVxvFyH9Vmb08hCL4yje6mDYT5zMquShhypkd50RRzkg==", + "requires": { + "cluster-key-slot": "^1.1.0", + "debug": "^4.3.1", + "denque": "^1.1.0", + "lodash.defaults": "^4.2.0", + "lodash.flatten": "^4.4.0", + "lodash.isarguments": "^3.1.0", + "p-map": "^2.1.0", + "redis-commands": "1.7.0", + "redis-errors": "^1.2.0", + "redis-parser": "^3.0.0", + "standard-as-callback": "^2.1.0" + } + }, + "ms": { + "version": "2.1.2", + "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz", + "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==" + }, + "semver": { + "version": "6.3.0", + "resolved": "https://registry.npmjs.org/semver/-/semver-6.3.0.tgz", + "integrity": "sha512-b39TBaTSfV6yBrapU89p5fKekE2m/NwnDocOVruQFS1/veMgdzuPcnOM34M6CwxW8jH/lxEa5rBoDeUwu5HHTw==" + } + } + }, "bytes": { "version": "3.1.0", "resolved": "https://registry.npmjs.org/bytes/-/bytes-3.1.0.tgz", @@ -2179,6 +2240,15 @@ "integrity": "sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ==", "dev": true }, + "cron-parser": { + "version": "2.18.0", + "resolved": "https://registry.npmjs.org/cron-parser/-/cron-parser-2.18.0.tgz", + "integrity": "sha512-s4odpheTyydAbTBQepsqd2rNWGa2iV3cyo8g7zbI2QQYGLVsfbhmwukayS1XHppe02Oy1fg7mg6xoaraVJeEcg==", + "requires": { + "is-nan": "^1.3.0", + "moment-timezone": "^0.5.31" + } + }, "cross-spawn": { "version": "7.0.3", "resolved": "https://registry.npmjs.org/cross-spawn/-/cross-spawn-7.0.3.tgz", @@ -2267,6 +2337,14 @@ "integrity": "sha512-FJ3UgI4gIl+PHZm53knsuSFpE+nESMr7M4v9QcgB7S63Kj/6WqMiFQJpBBYz1Pt+66bZpP3Q7Lye0Oo9MPKEdg==", "dev": true }, + "define-properties": { + "version": "1.1.3", + "resolved": "https://registry.npmjs.org/define-properties/-/define-properties-1.1.3.tgz", + "integrity": "sha512-3MqfYKj2lLzdMSf8ZIZE/V+Zuy+BgD6f164e8K2w7dgnpKArBDerGYpM46IYYcjnkdPNMjPk9A6VFB8+3SKlXQ==", + "requires": { + "object-keys": "^1.0.12" + } + }, "delayed-stream": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/delayed-stream/-/delayed-stream-1.0.0.tgz", @@ -3105,6 +3183,11 @@ "integrity": "sha512-pjzuKtY64GYfWizNAJ0fr9VqttZkNiK2iS430LtIHzjBEr6bX8Am2zm4sW4Ro5wjWW5cAlRL1qAMTcXbjNAO2Q==", "dev": true }, + "get-port": { + "version": "5.1.1", + "resolved": "https://registry.npmjs.org/get-port/-/get-port-5.1.1.tgz", + "integrity": "sha512-g/Q1aTSDOxFpchXC4i8ZWvxA1lnPqx/JHqcpIw0/LX9T8x/GBbi6YnlN5nhaKIFkT8oFsscUKgDJYxfwfS6QsQ==" + }, "get-stream": { "version": "6.0.1", "resolved": "https://registry.npmjs.org/get-stream/-/get-stream-6.0.1.tgz", @@ -3360,15 +3443,16 @@ "integrity": "sha512-7PnF4oN3CvZF23ADhA5wRaYEQpJ8qygSkbtTXWBeXWXmEVRXK+1ITciHWwHhsjv1TmW0MgacIv6hEi5pX5NQdA==" }, "ioredis": { - "version": "4.27.6", - "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-4.27.6.tgz", - "integrity": "sha512-6W3ZHMbpCa8ByMyC1LJGOi7P2WiOKP9B3resoZOVLDhi+6dDBOW+KNsRq3yI36Hmnb2sifCxHX+YSarTeXh48A==", + "version": "4.27.9", + "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-4.27.9.tgz", + "integrity": "sha512-hAwrx9F+OQ0uIvaJefuS3UTqW+ByOLyLIV+j0EH8ClNVxvFyH9Vmb08hCL4yje6mDYT5zMquShhypkd50RRzkg==", "requires": { "cluster-key-slot": "^1.1.0", "debug": "^4.3.1", "denque": "^1.1.0", "lodash.defaults": "^4.2.0", "lodash.flatten": "^4.4.0", + "lodash.isarguments": "^3.1.0", "p-map": "^2.1.0", "redis-commands": "1.7.0", "redis-errors": "^1.2.0", @@ -3452,6 +3536,15 @@ "is-extglob": "^2.1.1" } }, + "is-nan": { + "version": "1.3.2", + "resolved": "https://registry.npmjs.org/is-nan/-/is-nan-1.3.2.tgz", + "integrity": "sha512-E+zBKpQ2t6MEo1VsonYmluk9NxGrbzpeeLC2xIViuO2EjU2xsXsBPwTr3Ykv9l08UYEVEdWeRZNouaZqF6RN0w==", + "requires": { + "call-bind": "^1.0.0", + "define-properties": "^1.1.3" + } + }, "is-number": { "version": "7.0.0", "resolved": "https://registry.npmjs.org/is-number/-/is-number-7.0.0.tgz", @@ -4997,6 +5090,11 @@ "resolved": "https://registry.npmjs.org/lodash.flatten/-/lodash.flatten-4.4.0.tgz", "integrity": "sha1-8xwiIlqWMtK7+OSt2+8kCqdlph8=" }, + "lodash.isarguments": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/lodash.isarguments/-/lodash.isarguments-3.1.0.tgz", + "integrity": "sha1-L1c9hcaiQon/AGY7SRwdM4/zRYo=" + }, "lodash.merge": { "version": "4.6.2", "resolved": "https://registry.npmjs.org/lodash.merge/-/lodash.merge-4.6.2.tgz", @@ -5147,6 +5245,19 @@ "integrity": "sha512-vVqVZQyf3WLx2Shd0qJ9xuvqgAyKPLAiqITEtqW0oIUjzo3PePDd6fW9iFz30ef7Ysp/oiWqbhszeGWW2T6Gzw==", "dev": true }, + "moment": { + "version": "2.29.1", + "resolved": "https://registry.npmjs.org/moment/-/moment-2.29.1.tgz", + "integrity": "sha512-kHmoybcPV8Sqy59DwNDY3Jefr64lK/by/da0ViFcuA4DH0vQg5Q6Ze5VimxkfQNSC+Mls/Kx53s7TjP1RhFEDQ==" + }, + "moment-timezone": { + "version": "0.5.33", + "resolved": "https://registry.npmjs.org/moment-timezone/-/moment-timezone-0.5.33.tgz", + "integrity": "sha512-PTc2vcT8K9J5/9rDEPe5czSIKgLoGsH8UNpA4qZTVw0Vd/Uz19geE9abbIOQKaAQFcnQ3v5YEXrbSc5BpshH+w==", + "requires": { + "moment": ">= 2.9.0" + } + }, "mri": { "version": "1.1.4", "resolved": "https://registry.npmjs.org/mri/-/mri-1.1.4.tgz", @@ -5252,6 +5363,11 @@ "resolved": "https://registry.npmjs.org/object-inspect/-/object-inspect-1.10.3.tgz", "integrity": "sha512-e5mCJlSH7poANfC8z8S9s9S2IN5/4Zb3aZ33f5s8YqoazCFzNLloLU8r5VCG+G7WoqLvAAZoVMcy3tp/3X0Plw==" }, + "object-keys": { + "version": "1.1.1", + "resolved": "https://registry.npmjs.org/object-keys/-/object-keys-1.1.1.tgz", + "integrity": "sha512-NuAESUOUMrlIXOfHKzD6bpPu3tYt3xvjNdRIQ+FeT0lNb4K8WR70CaDxhuNguS2XG+GjkyMwOzsN5ZktImfhLA==" + }, "on-finished": { "version": "2.3.0", "resolved": "https://registry.npmjs.org/on-finished/-/on-finished-2.3.0.tgz", @@ -6378,8 +6494,7 @@ "tslib": { "version": "1.14.1", "resolved": "https://registry.npmjs.org/tslib/-/tslib-1.14.1.tgz", - "integrity": "sha512-Xni35NKzjgMrwevysHTCArtLDpPvye8zV/0E4EyYn43P7/7qvQwPh9BGkHewbMulVntbigmcT7rdX3BNo9wRJg==", - "dev": true + "integrity": "sha512-Xni35NKzjgMrwevysHTCArtLDpPvye8zV/0E4EyYn43P7/7qvQwPh9BGkHewbMulVntbigmcT7rdX3BNo9wRJg==" }, "tsutils": { "version": "3.21.0", @@ -6473,6 +6588,11 @@ "resolved": "https://registry.npmjs.org/utils-merge/-/utils-merge-1.0.1.tgz", "integrity": "sha1-n5VxD1CiZ5R7LMwSR0HBAoQn5xM=" }, + "uuid": { + "version": "8.3.2", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-8.3.2.tgz", + "integrity": "sha512-+NYs2QeMWy+GWFOEm9xnn6HCDp0l7QBD7ml8zLUmJ+93Q5NF0NocErnwkTkXVFNiX3/fpC6afS8Dhb/gz7R7eg==" + }, "v8-compile-cache": { "version": "2.3.0", "resolved": "https://registry.npmjs.org/v8-compile-cache/-/v8-compile-cache-2.3.0.tgz", diff --git a/asset-transfer-basic/rest-api-typescript/package.json b/asset-transfer-basic/rest-api-typescript/package.json index 815f9e19..6a9317ff 100644 --- a/asset-transfer-basic/rest-api-typescript/package.json +++ b/asset-transfer-basic/rest-api-typescript/package.json @@ -4,6 +4,7 @@ "description": "Asset Transfer Basic REST API implemented in TypeScript", "main": "dist/index.js", "dependencies": { + "bullmq": "^1.47.2", "dotenv": "^10.0.0", "env-var": "^7.0.1", "express": "^4.17.1", @@ -11,7 +12,7 @@ "fabric-network": "^2.2.8", "helmet": "^4.6.0", "http-status-codes": "^2.1.4", - "ioredis": "^4.27.6", + "ioredis": "^4.27.8", "passport": "^0.4.1", "passport-headerapikey": "^1.2.2", "pino": "^6.11.3", @@ -53,7 +54,7 @@ "start": "node --require source-map-support/register ./dist", "start:dotenv": "node --require source-map-support/register --require dotenv/config ./dist", "start:dev": "node --require source-map-support/register --require dotenv/config ./dist | pino-pretty", - "start:redis": "docker run -p 6379:6379 --name fabric-sample-redis -d redis", + "start:redis": "docker run -p 6379:6379 --name fabric-sample-redis -d redis --maxmemory-policy noeviction", "test": "jest" }, "author": "Hyperledger", diff --git a/asset-transfer-basic/rest-api-typescript/src/__mocks__/fabric-network.ts b/asset-transfer-basic/rest-api-typescript/src/__mocks__/fabric-network.ts deleted file mode 100644 index 4f58ae24..00000000 --- a/asset-transfer-basic/rest-api-typescript/src/__mocks__/fabric-network.ts +++ /dev/null @@ -1,178 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - */ - -import { mock } from 'jest-mock-extended'; -import { Contract, Network, Transaction } from 'fabric-network'; -import { mocked } from 'ts-jest/utils'; -import * as fabricProtos from 'fabric-protos'; - -const actualFabricNetwork = jest.requireActual('fabric-network'); -const Wallet = actualFabricNetwork.Wallet; -const Wallets = actualFabricNetwork.Wallets; - -const mockAsset1 = { - ID: 'asset1', - Color: 'blue', - Size: 5, - Owner: 'Tomoko', - AppraisedValue: 300, -}; -const mockAsset1Buffer = Buffer.from(JSON.stringify(mockAsset1)); - -const mockAsset2 = { - ID: 'asset2', - Color: 'red', - Size: 5, - Owner: 'Brad', - AppraisedValue: 400, -}; - -const mockAllAssetsBuffer = Buffer.from( - JSON.stringify([mockAsset1, mockAsset2]) -); - -const mockBlockchainInfoProto = fabricProtos.common.BlockchainInfo.create(); -mockBlockchainInfoProto.height = 42; -const mockBlockchainInfoBuffer = Buffer.from( - fabricProtos.common.BlockchainInfo.encode(mockBlockchainInfoProto).finish() -); - -const processedTransactionProto = - fabricProtos.protos.ProcessedTransaction.create(); -processedTransactionProto.validationCode = - fabricProtos.protos.TxValidationCode.VALID; -const processedTransactionBuffer = Buffer.from( - fabricProtos.protos.ProcessedTransaction.encode( - processedTransactionProto - ).finish() -); - -type FabricNetworkModule = jest.Mocked; - -const { - DefaultEventHandlerStrategies, - DefaultQueryHandlerStrategies, - Gateway, -}: FabricNetworkModule = jest.createMockFromModule('fabric-network'); - -const mockAssetExistsTransaction = mock(); -mockAssetExistsTransaction.evaluate - .calledWith('asset1') - .mockResolvedValue(Buffer.from('true')); -mockAssetExistsTransaction.evaluate - .calledWith('asset3') - .mockResolvedValue(Buffer.from('false')); - -const mockReadAssetTransaction = mock(); -mockReadAssetTransaction.evaluate - .calledWith('asset1') - .mockResolvedValue(mockAsset1Buffer); -mockReadAssetTransaction.evaluate - .calledWith('asset3') - .mockRejectedValue(new Error('the asset asset3 does not exist')); - -const mockCreateAssetTransaction = mock(); -mockCreateAssetTransaction.getTransactionId.mockReturnValue('txn1'); -mockCreateAssetTransaction.submit - .calledWith('asset1') - .mockRejectedValue( - new Error( - 'No valid responses from any peers. Errors:\n peer=peer0.org1.example.com:7051, status=500, message=the asset asset1 already exists\n peer=peer0.org2.example.com:9051, status=500, message=the asset asset3 already exists' - ) - ); - -// NOTE: only the second mocked GetAllAssets with return no assets -// TODO find a better alternative so that test order does not matter -const mockGetAllAssetsTransaction = mock(); -mockGetAllAssetsTransaction.evaluate - .mockResolvedValueOnce(Buffer.from('')) - .mockResolvedValueOnce(mockAllAssetsBuffer); - -const mockUpdateAssetTransaction = mock(); -mockUpdateAssetTransaction.getTransactionId.mockReturnValue('txn1'); -mockUpdateAssetTransaction.submit - .calledWith('asset3') - .mockRejectedValue( - new Error( - 'No valid responses from any peers. Errors:\n peer=peer0.org1.example.com:7051, status=500, message=the asset asset3 does not exist\n peer=peer0.org2.example.com:9051, status=500, message=the asset asset3 does not exist' - ) - ); - -const mockTransferAssetTransaction = mock(); -mockTransferAssetTransaction.getTransactionId.mockReturnValue('txn1'); -mockTransferAssetTransaction.submit - .calledWith('asset3') - .mockRejectedValue( - new Error( - 'No valid responses from any peers. Errors:\n peer=peer0.org1.example.com:7051, status=500, message=the asset asset3 does not exist\n peer=peer0.org2.example.com:9051, status=500, message=the asset asset3 does not exist' - ) - ); - -const mockDeleteAssetTransaction = mock(); -mockDeleteAssetTransaction.getTransactionId.mockReturnValue('txn1'); -mockDeleteAssetTransaction.submit - .calledWith('asset3') - .mockRejectedValue( - new Error( - 'No valid responses from any peers. Errors:\n peer=peer0.org1.example.com:7051, status=500, message=the asset asset3 does not exist\n peer=peer0.org2.example.com:9051, status=500, message=the asset asset3 does not exist' - ) - ); - -const mockBasicContract = mock(); -mockBasicContract.createTransaction - .calledWith('AssetExists') - .mockReturnValue(mockAssetExistsTransaction); -mockBasicContract.createTransaction - .calledWith('ReadAsset') - .mockReturnValue(mockReadAssetTransaction); -mockBasicContract.createTransaction - .calledWith('CreateAsset') - .mockReturnValue(mockCreateAssetTransaction); -mockBasicContract.createTransaction - .calledWith('GetAllAssets') - .mockReturnValue(mockGetAllAssetsTransaction); -mockBasicContract.createTransaction - .calledWith('UpdateAsset') - .mockReturnValue(mockUpdateAssetTransaction); -mockBasicContract.createTransaction - .calledWith('TransferAsset') - .mockReturnValue(mockTransferAssetTransaction); -mockBasicContract.createTransaction - .calledWith('DeleteAsset') - .mockReturnValue(mockDeleteAssetTransaction); - -const mockGetTransactionByIDTransaction = mock(); -mockGetTransactionByIDTransaction.evaluate - .calledWith('mychannel', 'txn2') - .mockResolvedValue(processedTransactionBuffer); -mockGetTransactionByIDTransaction.evaluate - .calledWith('mychannel', 'txn3') - .mockRejectedValue( - new Error( - 'Failed to get transaction with id txn3, error Entry not found in index' - ) - ); - -const mockSystemContract = mock(); -mockSystemContract.evaluateTransaction - .calledWith('GetChainInfo') - .mockResolvedValue(mockBlockchainInfoBuffer); -mockSystemContract.createTransaction - .calledWith('GetTransactionByID') - .mockReturnValue(mockGetTransactionByIDTransaction); - -const mockNetwork = mock(); -mockNetwork.getContract.calledWith('basic').mockReturnValue(mockBasicContract); -mockNetwork.getContract.calledWith('qscc').mockReturnValue(mockSystemContract); - -mocked(Gateway.prototype.getNetwork).mockResolvedValue(mockNetwork); - -export { - DefaultEventHandlerStrategies, - DefaultQueryHandlerStrategies, - Contract, - Gateway, - Wallet, - Wallets, -}; diff --git a/asset-transfer-basic/rest-api-typescript/src/__tests__/api.test.ts b/asset-transfer-basic/rest-api-typescript/src/__tests__/api.test.ts index 5b065426..06d29dba 100644 --- a/asset-transfer-basic/rest-api-typescript/src/__tests__/api.test.ts +++ b/asset-transfer-basic/rest-api-typescript/src/__tests__/api.test.ts @@ -2,20 +2,53 @@ * SPDX-License-Identifier: Apache-2.0 */ -jest.mock('fabric-network'); -jest.mock('ioredis', () => require('ioredis-mock/jest')); - -import { createServer } from '../server'; +import { Job, Queue } from 'bullmq'; import { Application } from 'express'; +import { Contract, Transaction } from 'fabric-network'; +import * as fabricProtos from 'fabric-protos'; +import { mock, MockProxy } from 'jest-mock-extended'; +import { mocked } from 'ts-jest/utils'; import request from 'supertest'; +import * as config from '../config'; +import { createServer } from '../server'; + +jest.mock('../config'); +jest.mock('bullmq'); + +const mockAsset1 = { + ID: 'asset1', + Color: 'blue', + Size: 5, + Owner: 'Tomoko', + AppraisedValue: 300, +}; +const mockAsset1Buffer = Buffer.from(JSON.stringify(mockAsset1)); + +const mockAsset2 = { + ID: 'asset2', + Color: 'red', + Size: 5, + Owner: 'Brad', + AppraisedValue: 400, +}; + +const mockAllAssetsBuffer = Buffer.from( + JSON.stringify([mockAsset1, mockAsset2]) +); // TODO add tests for server errors -// TODO implement 405 Method Not Allowed where appropriate and add tests describe('Asset Transfer Besic REST API', () => { let app: Application; + let mockJobQueue: MockProxy; beforeEach(async () => { app = await createServer(); + + const mockJob = mock(); + mockJob.id = '1'; + mockJobQueue = mock(); + mockJobQueue.add.mockResolvedValue(mockJob); + app.set('jobq', mockJobQueue); }); describe('/ready', () => { @@ -35,6 +68,31 @@ describe('Asset Transfer Besic REST API', () => { describe('/live', () => { it('GET should respond with 200 OK json', async () => { + const mockBlockchainInfoProto = + fabricProtos.common.BlockchainInfo.create(); + mockBlockchainInfoProto.height = 42; + const mockBlockchainInfoBuffer = Buffer.from( + fabricProtos.common.BlockchainInfo.encode( + mockBlockchainInfoProto + ).finish() + ); + + const mockOrg1QsccContract = mock(); + mockOrg1QsccContract.evaluateTransaction + .calledWith('GetChainInfo') + .mockResolvedValue(mockBlockchainInfoBuffer); + app.set(config.mspIdOrg1, { + qsccContract: mockOrg1QsccContract, + }); + + const mockOrg2QsccContract = mock(); + mockOrg2QsccContract.evaluateTransaction + .calledWith('GetChainInfo') + .mockResolvedValue(mockBlockchainInfoBuffer); + app.set(config.mspIdOrg2, { + qsccContract: mockOrg2QsccContract, + }); + const response = await request(app).get('/live'); expect(response.statusCode).toEqual(200); expect(response.header).toHaveProperty( @@ -49,6 +107,19 @@ describe('Asset Transfer Besic REST API', () => { }); describe('/api/assets', () => { + let mockGetAllAssetsTransaction: MockProxy; + + beforeEach(() => { + mockGetAllAssetsTransaction = mock(); + const mockBasicContract = mock(); + mockBasicContract.createTransaction + .calledWith('GetAllAssets') + .mockReturnValue(mockGetAllAssetsTransaction); + app.set(config.mspIdOrg1, { + assetContract: mockBasicContract, + }); + }); + it('GET should respond with 401 unauthorized json when an invalid API key is specified', async () => { const response = await request(app) .get('/api/assets') @@ -66,8 +137,8 @@ describe('Asset Transfer Besic REST API', () => { }); it('GET should respond with an empty json array when there are no assets', async () => { - // NOTE: only the first mocked GetAllAssets with return no assets - // TODO find a better alternative so that test order does not matter + mockGetAllAssetsTransaction.evaluate.mockResolvedValue(Buffer.from('')); + const response = await request(app) .get('/api/assets') .set('X-Api-Key', 'ORG1MOCKAPIKEY'); @@ -80,8 +151,10 @@ describe('Asset Transfer Besic REST API', () => { }); it('GET should respond with json array of assets', async () => { - // NOTE: only the second mocked GetAllAssets with return no assets - // TODO find a better alternative so that test order does not matter + mockGetAllAssetsTransaction.evaluate.mockResolvedValue( + mockAllAssetsBuffer + ); + const response = await request(app) .get('/api/assets') .set('X-Api-Key', 'ORG1MOCKAPIKEY'); @@ -180,37 +253,34 @@ describe('Asset Transfer Besic REST API', () => { ); expect(response.body).toEqual({ status: 'Accepted', - transactionId: 'txn1', - timestamp: expect.any(String), - }); - }); - - it('POST should respond with 409 conflict json when asset already exists', async () => { - const response = await request(app) - .post('/api/assets') - .send({ - id: 'asset1', - color: 'blue', - size: 5, - owner: 'Tomoko', - appraisedValue: 300, - }) - .set('X-Api-Key', 'ORG1MOCKAPIKEY'); - expect(response.statusCode).toEqual(409); - expect(response.header).toHaveProperty( - 'content-type', - 'application/json; charset=utf-8' - ); - expect(response.body).toEqual({ - status: 'Conflict', - reason: 'ASSET_EXISTS', - message: 'the asset asset1 already exists', + jobId: '1', timestamp: expect.any(String), }); }); }); describe('/api/assets/:id', () => { + let mockAssetExistsTransaction: MockProxy; + let mockReadAssetTransaction: MockProxy; + + beforeEach(() => { + const mockBasicContract = mock(); + + mockAssetExistsTransaction = mock(); + mockBasicContract.createTransaction + .calledWith('AssetExists') + .mockReturnValue(mockAssetExistsTransaction); + + mockReadAssetTransaction = mock(); + mockBasicContract.createTransaction + .calledWith('ReadAsset') + .mockReturnValue(mockReadAssetTransaction); + + app.set(config.mspIdOrg1, { + assetContract: mockBasicContract, + }); + }); + it('OPTIONS should respond with 401 unauthorized json when an invalid API key is specified', async () => { const response = await request(app) .options('/api/assets/asset1') @@ -228,6 +298,10 @@ describe('Asset Transfer Besic REST API', () => { }); it('OPTIONS should respond with 404 not found json without the allow header when there is no asset with the specified ID', async () => { + mockAssetExistsTransaction.evaluate + .calledWith('asset3') + .mockResolvedValue(Buffer.from('false')); + const response = await request(app) .options('/api/assets/asset3') .set('X-Api-Key', 'ORG1MOCKAPIKEY'); @@ -244,6 +318,10 @@ describe('Asset Transfer Besic REST API', () => { }); it('OPTIONS should respond with 200 OK json with the allow header', async () => { + mockAssetExistsTransaction.evaluate + .calledWith('asset1') + .mockResolvedValue(Buffer.from('true')); + const response = await request(app) .options('/api/assets/asset1') .set('X-Api-Key', 'ORG1MOCKAPIKEY'); @@ -279,6 +357,10 @@ describe('Asset Transfer Besic REST API', () => { }); it('GET should respond with 404 not found json when there is no asset with the specified ID', async () => { + mockReadAssetTransaction.evaluate + .calledWith('asset3') + .mockRejectedValue(new Error('the asset asset3 does not exist')); + const response = await request(app) .get('/api/assets/asset3') .set('X-Api-Key', 'ORG1MOCKAPIKEY'); @@ -294,6 +376,10 @@ describe('Asset Transfer Besic REST API', () => { }); it('GET should respond with the asset json when the asset exists', async () => { + mockReadAssetTransaction.evaluate + .calledWith('asset1') + .mockResolvedValue(mockAsset1Buffer); + const response = await request(app) .get('/api/assets/asset1') .set('X-Api-Key', 'ORG1MOCKAPIKEY'); @@ -334,28 +420,6 @@ describe('Asset Transfer Besic REST API', () => { }); }); - it('PUT should respond with 404 not found json when there is no asset with the specified ID', async () => { - const response = await request(app) - .put('/api/assets/asset3') - .send({ - id: 'asset3', - color: 'red', - size: 5, - owner: 'Brad', - appraisedValue: 400, - }) - .set('X-Api-Key', 'ORG1MOCKAPIKEY'); - expect(response.statusCode).toEqual(404); - expect(response.header).toHaveProperty( - 'content-type', - 'application/json; charset=utf-8' - ); - expect(response.body).toEqual({ - status: 'Not Found', - timestamp: expect.any(String), - }); - }); - it('PUT should respond with 400 bad request json when IDs do not match', async () => { const response = await request(app) .put('/api/assets/asset1') @@ -429,7 +493,7 @@ describe('Asset Transfer Besic REST API', () => { ); expect(response.body).toEqual({ status: 'Accepted', - transactionId: 'txn1', + jobId: '1', timestamp: expect.any(String), }); }); @@ -451,22 +515,6 @@ describe('Asset Transfer Besic REST API', () => { }); }); - it('PATCH should respond with 404 not found json when there is no asset with the specified ID', async () => { - const response = await request(app) - .patch('/api/assets/asset3') - .send([{ op: 'replace', path: '/owner', value: 'Ashleigh' }]) - .set('X-Api-Key', 'ORG1MOCKAPIKEY'); - expect(response.statusCode).toEqual(404); - expect(response.header).toHaveProperty( - 'content-type', - 'application/json; charset=utf-8' - ); - expect(response.body).toEqual({ - status: 'Not Found', - timestamp: expect.any(String), - }); - }); - it('PATCH should respond with 400 bad request json for invalid patch op/path', async () => { const response = await request(app) .patch('/api/assets/asset1') @@ -505,7 +553,7 @@ describe('Asset Transfer Besic REST API', () => { ); expect(response.body).toEqual({ status: 'Accepted', - transactionId: 'txn1', + jobId: '1', timestamp: expect.any(String), }); }); @@ -526,9 +574,45 @@ describe('Asset Transfer Besic REST API', () => { }); }); - it('DELETE should respond with 404 not found json when there is no asset with the specified ID', async () => { + it('DELETE should respond with 202 accepted json', async () => { const response = await request(app) - .delete('/api/assets/asset3') + .delete('/api/assets/asset1') + .set('X-Api-Key', 'ORG1MOCKAPIKEY'); + expect(response.statusCode).toEqual(202); + expect(response.header).toHaveProperty( + 'content-type', + 'application/json; charset=utf-8' + ); + expect(response.body).toEqual({ + status: 'Accepted', + jobId: '1', + timestamp: expect.any(String), + }); + }); + }); + + describe('/api/jobs/:id', () => { + it('GET should respond with 401 unauthorized json when an invalid API key is specified', async () => { + const response = await request(app) + .get('/api/jobs/1') + .set('X-Api-Key', 'NOTTHERIGHTAPIKEY'); + expect(response.statusCode).toEqual(401); + expect(response.header).toHaveProperty( + 'content-type', + 'application/json; charset=utf-8' + ); + expect(response.body).toEqual({ + reason: 'NO_VALID_APIKEY', + status: 'Unauthorized', + timestamp: expect.any(String), + }); + }); + + it('GET should respond with 404 not found json when there is no job with the specified ID', async () => { + mocked(Job.fromId).mockResolvedValue(undefined); + + const response = await request(app) + .get('/api/jobs/3') .set('X-Api-Key', 'ORG1MOCKAPIKEY'); expect(response.statusCode).toEqual(404); expect(response.header).toHaveProperty( @@ -541,24 +625,49 @@ describe('Asset Transfer Besic REST API', () => { }); }); - it('DELETE should respond with 202 accepted json', async () => { + it('GET should respond with json details for the specified job ID', async () => { + const mockJob = mock(); + mockJob.id = '2'; + mockJob.data = { + transactionIds: ['txn1', 'txn2'], + }; + mockJob.returnvalue = { + transactionError: 'Mock error', + transactionPayload: Buffer.from('Mock payload'), + }; + mockJobQueue.getJob.calledWith('2').mockResolvedValue(mockJob); + const response = await request(app) - .delete('/api/assets/asset1') + .get('/api/jobs/2') .set('X-Api-Key', 'ORG1MOCKAPIKEY'); - expect(response.statusCode).toEqual(202); + expect(response.statusCode).toEqual(200); expect(response.header).toHaveProperty( 'content-type', 'application/json; charset=utf-8' ); expect(response.body).toEqual({ - status: 'Accepted', - transactionId: 'txn1', - timestamp: expect.any(String), + jobId: '2', + transactionIds: ['txn1', 'txn2'], + transactionError: 'Mock error', + transactionPayload: 'Mock payload', }); }); }); describe('/api/transactions/:id', () => { + let mockGetTransactionByIDTransaction: MockProxy; + + beforeEach(() => { + mockGetTransactionByIDTransaction = mock(); + const mockQsccContract = mock(); + mockQsccContract.createTransaction + .calledWith('GetTransactionByID') + .mockReturnValue(mockGetTransactionByIDTransaction); + app.set(config.mspIdOrg1, { + qsccContract: mockQsccContract, + }); + }); + it('GET should respond with 401 unauthorized json when an invalid API key is specified', async () => { const response = await request(app) .get('/api/transactions/txn1') @@ -576,6 +685,14 @@ describe('Asset Transfer Besic REST API', () => { }); it('GET should respond with 404 not found json when there is no transaction with the specified ID', async () => { + mockGetTransactionByIDTransaction.evaluate + .calledWith('mychannel', 'txn3') + .mockRejectedValue( + new Error( + 'Failed to get transaction with id txn3, error Entry not found in index' + ) + ); + const response = await request(app) .get('/api/transactions/txn3') .set('X-Api-Key', 'ORG1MOCKAPIKEY'); @@ -591,6 +708,19 @@ describe('Asset Transfer Besic REST API', () => { }); it('GET should respond with json details for the specified transaction ID', async () => { + const processedTransactionProto = + fabricProtos.protos.ProcessedTransaction.create(); + processedTransactionProto.validationCode = + fabricProtos.protos.TxValidationCode.VALID; + const processedTransactionBuffer = Buffer.from( + fabricProtos.protos.ProcessedTransaction.encode( + processedTransactionProto + ).finish() + ); + mockGetTransactionByIDTransaction.evaluate + .calledWith('mychannel', 'txn2') + .mockResolvedValue(processedTransactionBuffer); + const response = await request(app) .get('/api/transactions/txn2') .set('X-Api-Key', 'ORG1MOCKAPIKEY'); @@ -600,10 +730,8 @@ describe('Asset Transfer Besic REST API', () => { 'application/json; charset=utf-8' ); expect(response.body).toEqual({ - status: 'OK', - progress: 'DONE', + transactionId: 'txn2', validationCode: 'VALID', - timestamp: expect.any(String), }); }); }); diff --git a/asset-transfer-basic/rest-api-typescript/src/assets.router.ts b/asset-transfer-basic/rest-api-typescript/src/assets.router.ts index 6bac3866..5d6c96f5 100644 --- a/asset-transfer-basic/rest-api-typescript/src/assets.router.ts +++ b/asset-transfer-basic/rest-api-typescript/src/assets.router.ts @@ -1,32 +1,35 @@ /* * SPDX-License-Identifier: Apache-2.0 * - * Note: this sample is intended to work with the basic asset transfer + * This sample is intended to work with the basic asset transfer * chaincode which imposes some constraints on what is possible here. * * For example, * - There is no validation for Asset IDs * - There are no error codes from the chaincode * + * To avoid timeouts, long running tasks should be decoupled from HTTP request + * processing + * + * Submit transactions can potentially be very long running, especially if the + * transaction fails and needs to be retried one or more times + * + * To allow requests to respond quickly enough, this sample queues submit + * requests for processing asynchronously and immediately returns 202 Accepted */ import express, { Request, Response } from 'express'; import { body, validationResult } from 'express-validator'; import { Contract } from 'fabric-network'; import { getReasonPhrase, StatusCodes } from 'http-status-codes'; -import { Redis } from 'ioredis'; -import { AssetExistsError, AssetNotFoundError } from './errors'; -import { evatuateTransaction, submitTransaction } from './fabric'; +import { Queue } from 'bullmq'; +import { AssetNotFoundError } from './errors'; +import { evatuateTransaction } from './fabric'; +import { addSubmitTransactionJob } from './jobs'; import { logger } from './logger'; -const { - ACCEPTED, - BAD_REQUEST, - CONFLICT, - INTERNAL_SERVER_ERROR, - NOT_FOUND, - OK, -} = StatusCodes; +const { ACCEPTED, BAD_REQUEST, INTERNAL_SERVER_ERROR, NOT_FOUND, OK } = + StatusCodes; export const assetsRouter = express.Router(); @@ -45,7 +48,7 @@ assetsRouter.get('/', async (req: Request, res: Response) => { return res.status(OK).json(assets); } catch (err) { - logger.error(err, 'Error processing get all assets request'); + logger.error({ err }, 'Error processing get all assets request'); return res.status(INTERNAL_SERVER_ERROR).json({ status: getReasonPhrase(INTERNAL_SERVER_ERROR), timestamp: new Date().toISOString(), @@ -76,14 +79,12 @@ assetsRouter.post( } const mspId = req.user as string; - const contract = req.app.get(mspId).assetContract as Contract; - const redis = req.app.get('redis') as Redis; const assetId = req.body.id; try { - const transactionId = await submitTransaction( - contract, - redis, + const submitQueue = req.app.get('jobq') as Queue; + const jobId = await addSubmitTransactionJob( + submitQueue, mspId, 'CreateAsset', assetId, @@ -95,26 +96,16 @@ assetsRouter.post( return res.status(ACCEPTED).json({ status: getReasonPhrase(ACCEPTED), - transactionId: transactionId, + jobId: jobId, timestamp: new Date().toISOString(), }); } catch (err) { logger.error( - err, - 'Error processing create asset request for asset ID %s with transaction ID %s', - assetId, - err.transactionId + { err }, + 'Error processing create asset request for asset ID %s', + assetId ); - if (err instanceof AssetExistsError) { - return res.status(CONFLICT).json({ - status: getReasonPhrase(CONFLICT), - reason: 'ASSET_EXISTS', - message: err.message, - timestamp: new Date().toISOString(), - }); - } - return res.status(INTERNAL_SERVER_ERROR).json({ status: getReasonPhrase(INTERNAL_SERVER_ERROR), timestamp: new Date().toISOString(), @@ -152,7 +143,7 @@ assetsRouter.options('/:assetId', async (req: Request, res: Response) => { } } catch (err) { logger.error( - err, + { err }, 'Error processing asset options request for asset ID %s', assetId ); @@ -177,7 +168,7 @@ assetsRouter.get('/:assetId', async (req: Request, res: Response) => { return res.status(OK).json(asset); } catch (err) { logger.error( - err, + { err }, 'Error processing read asset request for asset ID %s', assetId ); @@ -228,14 +219,12 @@ assetsRouter.put( } const mspId = req.user as string; - const contract = req.app.get(mspId).assetContract as Contract; - const redis = req.app.get('redis') as Redis; const assetId = req.params.assetId; try { - const transactionId = await submitTransaction( - contract, - redis, + const submitQueue = req.app.get('jobq') as Queue; + const jobId = await addSubmitTransactionJob( + submitQueue, mspId, 'UpdateAsset', assetId, @@ -247,24 +236,16 @@ assetsRouter.put( return res.status(ACCEPTED).json({ status: getReasonPhrase(ACCEPTED), - transactionId: transactionId, + jobId: jobId, timestamp: new Date().toISOString(), }); } catch (err) { logger.error( - err, - 'Error processing update asset request for asset ID %s with transaction ID %s', - assetId, - err.transactionId + { err }, + 'Error processing update asset request for asset ID %s', + assetId ); - if (err instanceof AssetNotFoundError) { - return res.status(NOT_FOUND).json({ - status: getReasonPhrase(NOT_FOUND), - timestamp: new Date().toISOString(), - }); - } - return res.status(INTERNAL_SERVER_ERROR).json({ status: getReasonPhrase(INTERNAL_SERVER_ERROR), timestamp: new Date().toISOString(), @@ -299,15 +280,13 @@ assetsRouter.patch( } const mspId = req.user as string; - const contract = req.app.get(mspId).assetContract as Contract; - const redis = req.app.get('redis') as Redis; const assetId = req.params.assetId; const newOwner = req.body[0].value; try { - const transactionId = await submitTransaction( - contract, - redis, + const submitQueue = req.app.get('jobq') as Queue; + const jobId = await addSubmitTransactionJob( + submitQueue, mspId, 'TransferAsset', assetId, @@ -316,24 +295,16 @@ assetsRouter.patch( return res.status(ACCEPTED).json({ status: getReasonPhrase(ACCEPTED), - transactionId: transactionId, + jobId: jobId, timestamp: new Date().toISOString(), }); } catch (err) { logger.error( - err, - 'Error processing update asset request for asset ID %s with transaction ID %s', - req.params.assetId, - err.transactionId + { err }, + 'Error processing update asset request for asset ID %s', + req.params.assetId ); - if (err instanceof AssetNotFoundError) { - return res.status(NOT_FOUND).json({ - status: getReasonPhrase(NOT_FOUND), - timestamp: new Date().toISOString(), - }); - } - return res.status(INTERNAL_SERVER_ERROR).json({ status: getReasonPhrase(INTERNAL_SERVER_ERROR), timestamp: new Date().toISOString(), @@ -346,14 +317,12 @@ assetsRouter.delete('/:assetId', async (req: Request, res: Response) => { logger.debug(req.body, 'Delete asset request received'); const mspId = req.user as string; - const contract = req.app.get(mspId).assetContract as Contract; - const redis = req.app.get('redis') as Redis; const assetId = req.params.assetId; try { - const transactionId = await submitTransaction( - contract, - redis, + const submitQueue = req.app.get('jobq') as Queue; + const jobId = await addSubmitTransactionJob( + submitQueue, mspId, 'DeleteAsset', assetId @@ -361,24 +330,16 @@ assetsRouter.delete('/:assetId', async (req: Request, res: Response) => { return res.status(ACCEPTED).json({ status: getReasonPhrase(ACCEPTED), - transactionId: transactionId, + jobId: jobId, timestamp: new Date().toISOString(), }); } catch (err) { logger.error( - err, - 'Error processing delete asset request for asset ID %s with transaction ID %s', - assetId, - err.transactionId + { err }, + 'Error processing delete asset request for asset ID %s', + assetId ); - if (err instanceof AssetNotFoundError) { - return res.status(NOT_FOUND).json({ - status: getReasonPhrase(NOT_FOUND), - timestamp: new Date().toISOString(), - }); - } - return res.status(INTERNAL_SERVER_ERROR).json({ status: getReasonPhrase(INTERNAL_SERVER_ERROR), timestamp: new Date().toISOString(), diff --git a/asset-transfer-basic/rest-api-typescript/src/config.spec.ts b/asset-transfer-basic/rest-api-typescript/src/config.spec.ts index fdd5d91e..f095c1b5 100644 --- a/asset-transfer-basic/rest-api-typescript/src/config.spec.ts +++ b/asset-transfer-basic/rest-api-typescript/src/config.spec.ts @@ -60,46 +60,156 @@ describe('Config values', () => { }); }); - describe('retryDelay', () => { - it('defaults to "3000"', () => { + describe('submitJobBackoffType', () => { + it('defaults to "fixed"', () => { const config = require('./config'); - expect(config.retryDelay).toBe(3000); + expect(config.submitJobBackoffType).toBe('fixed'); }); - it('can be configured using the "RETRY_DELAY" environment variable', () => { - process.env.RETRY_DELAY = '9999'; + it('can be configured using the "SUBMIT_JOB_BACKOFF_TYPE" environment variable', () => { + process.env.SUBMIT_JOB_BACKOFF_TYPE = 'exponential'; const config = require('./config'); - expect(config.retryDelay).toBe(9999); + expect(config.submitJobBackoffType).toBe('exponential'); }); - it('throws an error when the "RETRY_DELAY" environment variable has an invalid number', () => { - process.env.RETRY_DELAY = 'short'; + it('throws an error when the "LOG_LEVEL" environment variable has an invalid log level', () => { + process.env.SUBMIT_JOB_BACKOFF_TYPE = 'jitter'; expect(() => { require('./config'); }).toThrow( - 'env-var: "RETRY_DELAY" should be a valid integer. An example of a valid value would be: 3000' + 'env-var: "SUBMIT_JOB_BACKOFF_TYPE" should be one of [fixed, exponential]' ); }); }); - describe('maxRetryCount', () => { - it('defaults to "5"', () => { + describe('submitJobBackoffDelay', () => { + it('defaults to "3000"', () => { const config = require('./config'); - expect(config.maxRetryCount).toBe(5); + expect(config.submitJobBackoffDelay).toBe(3000); }); - it('can be configured using the "MAX_RETRY_COUNT" environment variable', () => { - process.env.MAX_RETRY_COUNT = '9999'; + it('can be configured using the "SUBMIT_JOB_BACKOFF_DELAY" environment variable', () => { + process.env.SUBMIT_JOB_BACKOFF_DELAY = '9999'; const config = require('./config'); - expect(config.maxRetryCount).toBe(9999); + expect(config.submitJobBackoffDelay).toBe(9999); }); - it('throws an error when the "MAX_RETRY_COUNT" environment variable has an invalid number', () => { - process.env.MAX_RETRY_COUNT = 'lots'; + it('throws an error when the "SUBMIT_JOB_BACKOFF_DELAY" environment variable has an invalid number', () => { + process.env.SUBMIT_JOB_BACKOFF_DELAY = 'short'; expect(() => { require('./config'); }).toThrow( - 'env-var: "MAX_RETRY_COUNT" should be a valid integer. An example of a valid value would be: 5' + 'env-var: "SUBMIT_JOB_BACKOFF_DELAY" should be a valid integer. An example of a valid value would be: 3000' + ); + }); + }); + + describe('submitJobAttempts', () => { + it('defaults to "5"', () => { + const config = require('./config'); + expect(config.submitJobAttempts).toBe(5); + }); + + it('can be configured using the "SUBMIT_JOB_ATTEMPTS" environment variable', () => { + process.env.SUBMIT_JOB_ATTEMPTS = '9999'; + const config = require('./config'); + expect(config.submitJobAttempts).toBe(9999); + }); + + it('throws an error when the "SUBMIT_JOB_ATTEMPTS" environment variable has an invalid number', () => { + process.env.SUBMIT_JOB_ATTEMPTS = 'lots'; + expect(() => { + require('./config'); + }).toThrow( + 'env-var: "SUBMIT_JOB_ATTEMPTS" should be a valid integer. An example of a valid value would be: 5' + ); + }); + }); + + describe('submitJobConcurrency', () => { + it('defaults to "5"', () => { + const config = require('./config'); + expect(config.submitJobConcurrency).toBe(5); + }); + + it('can be configured using the "SUBMIT_JOB_CONCURRENCY" environment variable', () => { + process.env.SUBMIT_JOB_CONCURRENCY = '9999'; + const config = require('./config'); + expect(config.submitJobConcurrency).toBe(9999); + }); + + it('throws an error when the "SUBMIT_JOB_CONCURRENCY" environment variable has an invalid number', () => { + process.env.SUBMIT_JOB_CONCURRENCY = 'lots'; + expect(() => { + require('./config'); + }).toThrow( + 'env-var: "SUBMIT_JOB_CONCURRENCY" should be a valid integer. An example of a valid value would be: 5' + ); + }); + }); + + describe('maxCompletedSubmitJobs', () => { + it('defaults to "1000"', () => { + const config = require('./config'); + expect(config.maxCompletedSubmitJobs).toBe(1000); + }); + + it('can be configured using the "MAX_COMPLETED_SUBMIT_JOBS" environment variable', () => { + process.env.MAX_COMPLETED_SUBMIT_JOBS = '9999'; + const config = require('./config'); + expect(config.maxCompletedSubmitJobs).toBe(9999); + }); + + it('throws an error when the "MAX_COMPLETED_SUBMIT_JOBS" environment variable has an invalid number', () => { + process.env.MAX_COMPLETED_SUBMIT_JOBS = 'lots'; + expect(() => { + require('./config'); + }).toThrow( + 'env-var: "MAX_COMPLETED_SUBMIT_JOBS" should be a valid integer. An example of a valid value would be: 1000' + ); + }); + }); + + describe('maxFailedSubmitJobs', () => { + it('defaults to "1000"', () => { + const config = require('./config'); + expect(config.maxFailedSubmitJobs).toBe(1000); + }); + + it('can be configured using the "MAX_FAILED_SUBMIT_JOBS" environment variable', () => { + process.env.MAX_FAILED_SUBMIT_JOBS = '9999'; + const config = require('./config'); + expect(config.maxFailedSubmitJobs).toBe(9999); + }); + + it('throws an error when the "MAX_FAILED_SUBMIT_JOBS" environment variable has an invalid number', () => { + process.env.MAX_FAILED_SUBMIT_JOBS = 'lots'; + expect(() => { + require('./config'); + }).toThrow( + 'env-var: "MAX_FAILED_SUBMIT_JOBS" should be a valid integer. An example of a valid value would be: 1000' + ); + }); + }); + + describe('submitJobQueueScheduler', () => { + it('defaults to "true"', () => { + const config = require('./config'); + expect(config.submitJobQueueScheduler).toBe(true); + }); + + it('can be configured using the "SUBMIT_JOB_QUEUE_SCHEDULER" environment variable', () => { + process.env.SUBMIT_JOB_QUEUE_SCHEDULER = 'false'; + const config = require('./config'); + expect(config.submitJobQueueScheduler).toBe(false); + }); + + it('throws an error when the "SUBMIT_JOB_QUEUE_SCHEDULER" environment variable has an invalid boolean value', () => { + process.env.SUBMIT_JOB_QUEUE_SCHEDULER = '11'; + expect(() => { + require('./config'); + }).toThrow( + 'env-var: "SUBMIT_JOB_QUEUE_SCHEDULER" should be either "true", "false", "TRUE", or "FALSE". An example of a valid value would be: true' ); }); }); @@ -152,28 +262,6 @@ describe('Config values', () => { }); }); - describe('blockListenerOrg', () => { - it('defaults to "Org1"', () => { - const config = require('./config'); - expect(config.blockListenerOrg).toBe('Org1'); - }); - - it('can be configured using the "HLF_BLOCK_LISTENER_ORG" environment variable', () => { - process.env.HLF_BLOCK_LISTENER_ORG = 'Org2'; - const config = require('./config'); - expect(config.blockListenerOrg).toBe('Org2'); - }); - - it('throws an error when the "HLF_BLOCK_LISTENER_ORG" environment variable has an invalid value', () => { - process.env.HLF_BLOCK_LISTENER_ORG = 'Org3'; - expect(() => { - require('./config'); - }).toThrow( - 'env-var: "HLF_BLOCK_LISTENER_ORG" should be one of [Org1, Org2]' - ); - }); - }); - describe('channelName', () => { it('defaults to "mychannel"', () => { const config = require('./config'); diff --git a/asset-transfer-basic/rest-api-typescript/src/config.ts b/asset-transfer-basic/rest-api-typescript/src/config.ts index 47c0f909..a15c0cec 100644 --- a/asset-transfer-basic/rest-api-typescript/src/config.ts +++ b/asset-transfer-basic/rest-api-typescript/src/config.ts @@ -7,6 +7,8 @@ import * as env from 'env-var'; export const ORG1 = 'Org1'; export const ORG2 = 'Org2'; +export const JOB_QUEUE_NAME = 'submit'; + /* * Log level for the REST server */ @@ -25,23 +27,69 @@ export const port = env .asPortNumber(); /* - * The delay between each retry attempt in milliseconds + * The type of backoff to use for retrying failed submit jobs */ -export const retryDelay = env - .get('RETRY_DELAY') +export const submitJobBackoffType = env + .get('SUBMIT_JOB_BACKOFF_TYPE') + .default('fixed') + .asEnum(['fixed', 'exponential']); + +/* + * Backoff delay for retrying failed submit jobs in milliseconds + */ +export const submitJobBackoffDelay = env + .get('SUBMIT_JOB_BACKOFF_DELAY') .default('3000') .example('3000') .asIntPositive(); /* - * The maximum number of times to retry a failing transaction + * The total number of attempts to try a submit job until it completes */ -export const maxRetryCount = env - .get('MAX_RETRY_COUNT') +export const submitJobAttempts = env + .get('SUBMIT_JOB_ATTEMPTS') .default('5') .example('5') .asIntPositive(); +/* + * The maximum number of submit jobs that can be processed in parallel + */ +export const submitJobConcurrency = env + .get('SUBMIT_JOB_CONCURRENCY') + .default('5') + .example('5') + .asIntPositive(); + +/* + * The number of completed submit jobs to keep + */ +export const maxCompletedSubmitJobs = env + .get('MAX_COMPLETED_SUBMIT_JOBS') + .default('1000') + .example('1000') + .asIntPositive(); + +/* + * The number of failed submit jobs to keep + */ +export const maxFailedSubmitJobs = env + .get('MAX_FAILED_SUBMIT_JOBS') + .default('1000') + .example('1000') + .asIntPositive(); + +/* + * Whether to initialise a scheduler for the submit job queue + * There must be at least on queue scheduler to handle retries and you may want + * more than one for redundancy + */ +export const submitJobQueueScheduler = env + .get('SUBMIT_JOB_QUEUE_SCHEDULER') + .default('true') + .example('true') + .asBoolStrict(); + /* * Whether to convert discovered host addresses to be 'localhost' * This should be set to 'true' when running a docker composed fabric network on the @@ -71,14 +119,6 @@ export const mspIdOrg2 = env .example(`${ORG2}MSP`) .asString(); -/* - * The block listener org - */ -export const blockListenerOrg = env - .get('HLF_BLOCK_LISTENER_ORG') - .default(ORG1) - .asEnum([ORG1, ORG2]); - /* * Name of the channel which the basic asset sample chaincode has been installed on */ @@ -205,7 +245,7 @@ export const redisPort = env */ export const redisUsername = env .get('REDIS_USERNAME') - .example('conga') + .example('fabric') .asString(); /* diff --git a/asset-transfer-basic/rest-api-typescript/src/errors.spec.ts b/asset-transfer-basic/rest-api-typescript/src/errors.spec.ts index a0813c90..6ef490f6 100644 --- a/asset-transfer-basic/rest-api-typescript/src/errors.spec.ts +++ b/asset-transfer-basic/rest-api-typescript/src/errors.spec.ts @@ -5,15 +5,55 @@ import { AssetExistsError, AssetNotFoundError, - TransactionError, TransactionNotFoundError, handleError, isDuplicateTransactionError, + isErrorLike, } from './errors'; describe('Errors', () => { + describe('isErrorLike', () => { + it('returns false for null', () => { + expect(isErrorLike(null)).toBe(false); + }); + + it('returns false for undefined', () => { + expect(isErrorLike(undefined)).toBe(false); + }); + + it('returns false for empty object', () => { + expect(isErrorLike({})).toBe(false); + }); + + it('returns false for string', () => { + expect(isErrorLike('true')).toBe(false); + }); + + it('returns false for non-error object', () => { + expect(isErrorLike({ size: 42 })).toBe(false); + }); + + it('returns false for invalid error object', () => { + expect(isErrorLike({ name: 'MockError', message: 42 })).toBe(false); + }); + + it('returns false for error like object with invalid stack', () => { + expect( + isErrorLike({ name: 'MockError', message: 'Fail', stack: false }) + ).toBe(false); + }); + + it('returns true for error like object', () => { + expect(isErrorLike({ name: 'MockError', message: 'Fail' })).toBe(true); + }); + + it('returns true for new Error', () => { + expect(isErrorLike(new Error('Error'))).toBe(true); + }); + }); + describe('isDuplicateTransactionError', () => { - it('returns true for an error with duplicate transaction endorsement details', () => { + it('returns true for an error when all endorsement details are duplicate transaction found', () => { const mockDuplicateTransactionError = { errors: [ { @@ -21,6 +61,36 @@ describe('Errors', () => { { details: 'duplicate transaction found', }, + { + details: 'duplicate transaction found', + }, + { + details: 'duplicate transaction found', + }, + ], + }, + ], + }; + + expect(isDuplicateTransactionError(mockDuplicateTransactionError)).toBe( + true + ); + }); + + it('returns true for an error when at least one endorsement details are duplicate transaction found', () => { + const mockDuplicateTransactionError = { + errors: [ + { + endorsements: [ + { + details: 'duplicate transaction found', + }, + { + details: 'mock endorsement details', + }, + { + details: 'mock endorsement details', + }, ], }, ], @@ -39,6 +109,9 @@ describe('Errors', () => { { details: 'mock endorsement details', }, + { + details: 'mock endorsement details', + }, ], }, ], @@ -48,6 +121,38 @@ describe('Errors', () => { false ); }); + + it('returns false for an error without endorsement details', () => { + const mockDuplicateTransactionError = { + errors: [ + { + rejections: [ + { + details: 'duplicate transaction found', + }, + ], + }, + ], + }; + + expect(isDuplicateTransactionError(mockDuplicateTransactionError)).toBe( + false + ); + }); + + it('returns false for a basic Error object without endorsement details', () => { + expect( + isDuplicateTransactionError(new Error('duplicate transaction found')) + ).toBe(false); + }); + + it('returns false for an undefined error', () => { + expect(isDuplicateTransactionError(undefined)).toBe(false); + }); + + it('returns false for a null error', () => { + expect(isDuplicateTransactionError(null)).toBe(false); + }); }); describe('handleError', () => { @@ -77,25 +182,27 @@ describe('Errors', () => { } ); - it('returns a TransactionNotFoundError for errors with a transaction not found message', () => { - expect( - handleError( - 'txn1', - new Error( - 'Failed to get transaction with id txn, error Entry not found in index' - ) - ) - ).toStrictEqual( - new TransactionNotFoundError( - 'Failed to get transaction with id txn, error Entry not found in index', - 'txn1' - ) + it.each([ + 'Failed to get transaction with id txn, error Entry not found in index', + 'Failed to get transaction with id txn, error no such transaction ID [txn] in index', + ])( + 'returns a TransactionNotFoundError for errors with a transaction not found message: %s', + (msg) => { + expect(handleError('txn1', new Error(msg))).toStrictEqual( + new TransactionNotFoundError(msg, 'txn1') + ); + } + ); + + it('returns the original error for errors with other messages', () => { + expect(handleError('txn1', new Error('MOCK ERROR'))).toStrictEqual( + new Error('MOCK ERROR') ); }); - it('returns a TransactionError for errors with other messages', () => { - expect(handleError('txn1', new Error('MOCK ERROR'))).toStrictEqual( - new TransactionError('Transaction error', 'txn1') + it('returns a new Error object for errors of other types', () => { + expect(handleError('txn1', 42)).toStrictEqual( + new Error('Unhandled error: 42') ); }); }); diff --git a/asset-transfer-basic/rest-api-typescript/src/errors.ts b/asset-transfer-basic/rest-api-typescript/src/errors.ts index 5a1fde2e..89eaaa6b 100644 --- a/asset-transfer-basic/rest-api-typescript/src/errors.ts +++ b/asset-transfer-basic/rest-api-typescript/src/errors.ts @@ -4,23 +4,23 @@ import { logger } from './logger'; -export class TransactionError extends Error { +export class ContractError extends Error { transactionId: string; constructor(message: string, transactionId: string) { super(message); - Object.setPrototypeOf(this, TransactionError.prototype); + Object.setPrototypeOf(this, ContractError.prototype); this.name = 'TransactionError'; this.transactionId = transactionId; } } -export class TransactionNotFoundError extends Error { +export class TransactionNotFoundError extends ContractError { transactionId: string; constructor(message: string, transactionId: string) { - super(message); + super(message, transactionId); Object.setPrototypeOf(this, TransactionNotFoundError.prototype); this.name = 'TransactionNotFoundError'; @@ -28,7 +28,7 @@ export class TransactionNotFoundError extends Error { } } -export class AssetExistsError extends TransactionError { +export class AssetExistsError extends ContractError { constructor(message: string, transactionId: string) { super(message, transactionId); Object.setPrototypeOf(this, AssetExistsError.prototype); @@ -37,7 +37,7 @@ export class AssetExistsError extends TransactionError { } } -export class AssetNotFoundError extends TransactionError { +export class AssetNotFoundError extends ContractError { constructor(message: string, transactionId: string) { super(message, transactionId); Object.setPrototypeOf(this, AssetNotFoundError.prototype); @@ -46,6 +46,29 @@ export class AssetNotFoundError extends TransactionError { } } +export class JobNotFoundError extends Error { + jobId: string; + + constructor(message: string, jobId: string) { + super(message); + Object.setPrototypeOf(this, JobNotFoundError.prototype); + + this.name = 'JobNotFoundError'; + this.jobId = jobId; + } +} + +export const isErrorLike = (err: unknown): err is Error => { + return ( + err != undefined && + err != null && + typeof (err as Error).name === 'string' && + typeof (err as Error).message === 'string' && + ((err as Error).stack === undefined || + typeof (err as Error).stack === 'string') + ); +}; + /* * Checks whether an error was caused by a duplicate transaction. * @@ -54,19 +77,103 @@ export class AssetNotFoundError extends TransactionError { * DUPLICATE_TXID TxValidationCode somehow but that does not seem to be * possible. */ -export const isDuplicateTransactionError = (error: { - errors: { endorsements: { details: string }[] }[]; -}): boolean => { - try { - const isDuplicateTxn = error?.errors?.some((err) => - err?.endorsements?.some((endorsement) => - endorsement?.details?.startsWith('duplicate transaction found') - ) - ); +export const isDuplicateTransactionError = (err: unknown): boolean => { + if (err === undefined || err === null) return false; - return isDuplicateTxn; - } catch (err) { - logger.warn(err, 'Error checking for duplicate transaction'); + const endorsementError = err as { + errors: { endorsements: { details: string }[] }[]; + }; + + const isDuplicate = endorsementError?.errors?.some((err) => + err?.endorsements?.some((endorsement) => + endorsement?.details?.startsWith('duplicate transaction found') + ) + ); + + return isDuplicate === true; +}; + +/* + * Matches asset already exists error strings from the asset contract + * + * The regex needs to match the following error messages: + * "the asset %s already exists" + * "The asset ${id} already exists" + * "Asset %s already exists" + */ +const matchAssetAlreadyExistsMessage = (message: string): string | null => { + // + const assetAlreadyExistsRegex = /([tT]he )?[aA]sset \w* already exists/g; + const assetAlreadyExistsMatch = message.match(assetAlreadyExistsRegex); + logger.debug( + { message: message, result: assetAlreadyExistsMatch }, + 'Checking for asset already exists message' + ); + + if (assetAlreadyExistsMatch !== null) { + return assetAlreadyExistsMatch[0]; + } + + return null; +}; + +/* + * Matches asset does not exist error strings from the asset contract + * + * The regex needs to match the following error messages: + * "the asset %s does not exist" + * "The asset ${id} does not exist" + * "Asset %s does not exist" + */ +const matchAssetDoesNotExistMessage = (message: string): string | null => { + const assetDoesNotExistRegex = /([tT]he )?[aA]sset \w* does not exist/g; + const assetDoesNotExistMatch = message.match(assetDoesNotExistRegex); + logger.debug( + { message: message, result: assetDoesNotExistMatch }, + 'Checking for asset does not exist message' + ); + + if (assetDoesNotExistMatch !== null) { + return assetDoesNotExistMatch[0]; + } + + return null; +}; + +/* + * Matches transaction does not exist error strings from the contract API + * + * The regex needs to match the following error messages: + * "Failed to get transaction with id %s, error Entry not found in index" + * "Failed to get transaction with id %s, error no such transaction ID [%s] in index" + */ +const matchTransactionDoesNotExistMessage = ( + message: string +): string | null => { + const transactionDoesNotExistRegex = + /Failed to get transaction with id [^,]*, error (?:(?:Entry not found)|(?:no such transaction ID \[[^\]]*\])) in index/g; + const transactionDoesNotExistMatch = message.match( + transactionDoesNotExistRegex + ); + logger.debug( + { message: message, result: transactionDoesNotExistMatch }, + 'Checking for transaction does not exist message' + ); + + if (transactionDoesNotExistMatch !== null) { + return transactionDoesNotExistMatch[0]; + } + + return null; +}; + +export const isContractError = (err: unknown): boolean => { + if ( + err instanceof AssetExistsError || + err instanceof AssetNotFoundError || + err instanceof TransactionNotFoundError + ) { + return true; } return false; @@ -80,56 +187,32 @@ export const isDuplicateTransactionError = (error: { * again it's the only option. The error message text is not even the same for * the Go, Java, and Javascript implementations of the chaincode! */ -export const handleError = (transactionId: string, err: Error): Error => { - // This regex needs to match the following error messages: - // "the asset %s already exists" - // "The asset ${id} already exists" - // "Asset %s already exists" - const assetAlreadyExistsRegex = /([tT]he )?[aA]sset \w* already exists/g; - const assetAlreadyExistsMatch = err.message.match(assetAlreadyExistsRegex); - logger.debug( - { message: err.message, result: assetAlreadyExistsMatch }, - 'Checking for asset already exists message' - ); - if (assetAlreadyExistsMatch) { - return new AssetExistsError(assetAlreadyExistsMatch[0], transactionId); - } +export const handleError = (transactionId: string, err: unknown): Error => { + logger.debug({ transactionId: transactionId, err }, 'Processing error'); - // This regex needs to match the following error messages: - // "the asset %s does not exist" - // "The asset ${id} does not exist" - // "Asset %s does not exist" - const assetDoesNotExistRegex = /([tT]he )?[aA]sset \w* does not exist/g; - const assetDoesNotExistMatch = err.message.match(assetDoesNotExistRegex); - logger.debug( - { message: err.message, result: assetDoesNotExistMatch }, - 'Checking for asset does not exist message' - ); - if (assetDoesNotExistMatch) { - return new AssetNotFoundError(assetDoesNotExistMatch[0], transactionId); - } + if (isErrorLike(err)) { + const assetAlreadyExistsMatch = matchAssetAlreadyExistsMessage(err.message); + if (assetAlreadyExistsMatch !== null) { + return new AssetExistsError(assetAlreadyExistsMatch, transactionId); + } - // This regex needs to match the following error messages: - // "Failed to get transaction with id %s, error Entry not found in index" - const transactionDoesNotExistRegex = - /Failed to get transaction with id [^,]*, error Entry not found in index/g; - const transactionDoesNotExistMatch = err.message.match( - transactionDoesNotExistRegex - ); - logger.debug( - { message: err.message, result: transactionDoesNotExistMatch }, - 'Checking for transaction does not exist message' - ); - if (transactionDoesNotExistMatch) { - return new TransactionNotFoundError( - transactionDoesNotExistMatch[0], - transactionId + const assetDoesNotExistMatch = matchAssetDoesNotExistMessage(err.message); + if (assetDoesNotExistMatch !== null) { + return new AssetNotFoundError(assetDoesNotExistMatch, transactionId); + } + + const transactionDoesNotExistMatch = matchTransactionDoesNotExistMessage( + err.message ); + if (transactionDoesNotExistMatch !== null) { + return new TransactionNotFoundError( + transactionDoesNotExistMatch, + transactionId + ); + } + + return err; } - logger.error( - { transactionId: transactionId, error: err }, - 'Unhandled transaction error' - ); - return new TransactionError('Transaction error', transactionId); + return new Error(`Unhandled error: ${err}`); }; diff --git a/asset-transfer-basic/rest-api-typescript/src/fabric.spec.ts b/asset-transfer-basic/rest-api-typescript/src/fabric.spec.ts index a551dac9..34e6f2f0 100644 --- a/asset-transfer-basic/rest-api-typescript/src/fabric.spec.ts +++ b/asset-transfer-basic/rest-api-typescript/src/fabric.spec.ts @@ -10,69 +10,49 @@ import { evatuateTransaction, submitTransaction, getBlockHeight, - startRetryLoop, - blockEventHandler, + getTransactionValidationCode, + processSubmitTransactionJob, } from './fabric'; import * as config from './config'; import { AssetExistsError, AssetNotFoundError, - TransactionError, TransactionNotFoundError, } from './errors'; import { - BlockEvent, Contract, Gateway, GatewayOptions, Network, Transaction, - TransactionEvent, Wallet, } from 'fabric-network'; import * as fabricProtos from 'fabric-protos'; import { MockProxy, mock } from 'jest-mock-extended'; -import IORedis, { Redis } from 'ioredis'; import Long from 'long'; +import { Job } from 'bullmq'; jest.mock('./config'); +jest.mock('fabric-network', () => { + type FabricNetworkModule = jest.Mocked; + const originalModule: FabricNetworkModule = + jest.requireActual('fabric-network'); + const mockModule: FabricNetworkModule = + jest.createMockFromModule('fabric-network'); + + return { + __esModule: true, + ...mockModule, + Wallets: originalModule.Wallets, + }; +}); jest.mock('ioredis', () => require('ioredis-mock/jest')); describe('Fabric', () => { - const mockTransactionId = - '0ae62c01e4c4b112c3f3954a2f11243da76778e46df9ad2783bcbafc79652b95'; - const mockKey = `txn:${mockTransactionId}`; - const mockMspId = 'Org1MSP'; - const mockState = Buffer.from( - `{"name":"CreateAsset","nonce":"damqinq8nrI4n4qY8lFVsZw7RwG2ufrv","transactionId":${mockTransactionId}` - ); - const mockArgs = '["test111","red",400,"Jean",101]'; - const mockTimestamp = 1628078044362; - - const addMockTransationDetails = async (redis: Redis) => { - await redis - .multi() - .hset( - mockKey, - 'mspId', - mockMspId, - 'state', - mockState, - 'args', - mockArgs, - 'timestamp', - mockTimestamp, - 'retries', - '0' - ) - .zadd('index:txn:timestamp', mockTimestamp, mockTransactionId) - .exec(); - }; - describe('createWallet', () => { it('creates a wallet containing identities for both orgs', async () => { const wallet = await createWallet(); @@ -137,162 +117,130 @@ describe('Fabric', () => { }); }); - describe('startRetryLoop', () => { - let redis: Redis; + describe('processSubmitTransactionJob', () => { + const mockContracts = new Map(); + const mockPayload = Buffer.from('MOCK PAYLOAD'); + const mockSavedState = Buffer.from('MOCK SAVED STATE'); let mockTransaction: MockProxy; let mockContract: MockProxy; - let mockContracts: Map; - - const flushPromises = () => { - jest.useRealTimers(); - return new Promise((resolve) => setImmediate(resolve)); - }; + let mockJob: MockProxy; beforeEach(() => { - const redisOptions = { - port: config.redisPort, - host: config.redisHost, - username: config.redisUsername, - password: config.redisPassword, + mockTransaction = mock(); + mockTransaction.getTransactionId.mockReturnValue('mockTransactionId'); + + mockContract = mock(); + mockContract.createTransaction + .calledWith('txn') + .mockReturnValue(mockTransaction); + mockContract.deserializeTransaction + .calledWith(mockSavedState) + .mockReturnValue(mockTransaction); + mockContracts.set('mockMspid', mockContract); + + mockJob = mock(); + }); + + it('gets job result with no error or payload if no contract is available for the required mspid', async () => { + mockJob.data = { + mspid: 'missingMspid', }; - redis = new IORedis(redisOptions) as unknown as Redis; + const jobResult = await processSubmitTransactionJob( + mockContracts, + mockJob + ); - mockTransaction = mock(); + expect(jobResult).toStrictEqual({ + transactionError: undefined, + transactionPayload: undefined, + }); + }); + + it('gets a job result containing a payload if the transaction was successful first time', async () => { + mockJob.data = { + mspid: 'mockMspid', + transactionName: 'txn', + transactionArgs: ['arg1', 'arg2'], + }; mockTransaction.submit - .mockResolvedValue(Buffer.from('MOCK PAYLOAD')) - .mockName('submit'); - mockContract = mock(); - mockContract.deserializeTransaction.mockReturnValue(mockTransaction); - mockContracts = new Map(); - mockContracts.set(mockMspId, mockContract); + .calledWith('arg1', 'arg2') + .mockResolvedValue(mockPayload); - jest.useFakeTimers(); - }); - - afterEach(() => { - jest.useRealTimers(); - }); - - it('starts a retry loop which does nothing if there are no saved transaction details', async () => { - const getContractSpy = jest.spyOn(mockContracts, 'get'); - - startRetryLoop(mockContracts, redis); - jest.runOnlyPendingTimers(); - await flushPromises(); - - expect(getContractSpy).not.toBeCalled(); - }); - - it('starts a retry loop which clears the saved details after succesfully retrying a transaction', async () => { - addMockTransationDetails(redis); - - startRetryLoop(mockContracts, redis); - jest.runOnlyPendingTimers(); - await flushPromises(); - - expect(mockContract.deserializeTransaction).toBeCalledWith(mockState); - expect(mockTransaction.submit).toBeCalledWith( - 'test111', - 'red', - 400, - 'Jean', - 101 + const jobResult = await processSubmitTransactionJob( + mockContracts, + mockJob ); - const index = await redis.zrange('index:txn:timestamp', 0, -1); - expect(index).toStrictEqual([]); + expect(jobResult).toStrictEqual({ + transactionError: undefined, + transactionPayload: Buffer.from('MOCK PAYLOAD'), + }); }); - it('starts a retry loop which increments the retry count when a transaction fails', async () => { - addMockTransationDetails(redis); - mockTransaction.submit.mockRejectedValue(new Error('MOCK ERROR')); + it('gets a job result containing a payload if the transaction was successfully rerun using saved transaction state', async () => { + mockJob.data = { + mspid: 'mockMspid', + transactionName: 'txn', + transactionArgs: ['arg1', 'arg2'], + transactionState: mockSavedState, + }; + mockTransaction.submit + .calledWith('arg1', 'arg2') + .mockResolvedValue(mockPayload); - startRetryLoop(mockContracts, redis); - jest.runOnlyPendingTimers(); - await flushPromises(); - - expect(mockContract.deserializeTransaction).toBeCalledWith(mockState); - expect(mockTransaction.submit).toBeCalledWith( - 'test111', - 'red', - 400, - 'Jean', - 101 + const jobResult = await processSubmitTransactionJob( + mockContracts, + mockJob ); - const index = await redis.zrange('index:txn:timestamp', 0, -1); - expect(index).toStrictEqual([ - '0ae62c01e4c4b112c3f3954a2f11243da76778e46df9ad2783bcbafc79652b95', - ]); - - const savedTransaction = await (redis as Redis).hgetall(mockKey); - expect(savedTransaction.retries).toBe('1'); + expect(jobResult).toStrictEqual({ + transactionError: undefined, + transactionPayload: Buffer.from('MOCK PAYLOAD'), + }); }); - it('starts a retry loop which clears the saved details when a transaction fails as a duplicate', async () => { - addMockTransationDetails(redis); - const mockDuplicateTransactionError = new Error('MOCK ERROR'); - // eslint-disable-next-line @typescript-eslint/no-explicit-any - (mockDuplicateTransactionError as any).errors = [ - { - endorsements: [ - { - details: 'duplicate transaction found', - }, - ], - }, - ]; - mockTransaction.submit.mockRejectedValue(mockDuplicateTransactionError); + it('gets a job result containing an error message if the transaction fails but cannot be retried', async () => { + mockJob.data = { + mspid: 'mockMspid', + transactionName: 'txn', + transactionArgs: ['arg1', 'arg2'], + transactionState: mockSavedState, + }; + mockTransaction.submit + .calledWith('arg1', 'arg2') + .mockRejectedValue( + new Error( + 'Failed to get transaction with id txn, error Entry not found in index' + ) + ); - startRetryLoop(mockContracts, redis); - jest.runOnlyPendingTimers(); - await flushPromises(); - - expect(mockContract.deserializeTransaction).toBeCalledWith(mockState); - expect(mockTransaction.submit).toBeCalledWith( - 'test111', - 'red', - 400, - 'Jean', - 101 + const jobResult = await processSubmitTransactionJob( + mockContracts, + mockJob ); - const index = await redis.zrange('index:txn:timestamp', 0, -1); - expect(index).toStrictEqual([]); + expect(jobResult).toStrictEqual({ + transactionError: + 'TransactionNotFoundError: Failed to get transaction with id txn, error Entry not found in index', + transactionPayload: undefined, + }); }); - it('starts a retry loop which clears the saved details when a transaction fails the final attempt', async () => { - addMockTransationDetails(redis); - await (redis as Redis).hincrby(mockKey, 'retries', 5); - mockTransaction.submit.mockRejectedValue(new Error('MOCK ERROR')); + it('throws an error if the transaction fails but can be retried', async () => { + mockJob.data = { + mspid: 'mockMspid', + transactionName: 'txn', + transactionArgs: ['arg1', 'arg2'], + transactionState: mockSavedState, + }; + mockTransaction.submit + .calledWith('arg1', 'arg2') + .mockRejectedValue(new Error('MOCK ERROR')); - startRetryLoop(mockContracts, redis); - jest.runOnlyPendingTimers(); - await flushPromises(); - - expect(mockContract.deserializeTransaction).toBeCalledWith(mockState); - expect(mockTransaction.submit).toBeCalledWith( - 'test111', - 'red', - 400, - 'Jean', - 101 - ); - - const index = await redis.zrange('index:txn:timestamp', 0, -1); - expect(index).toStrictEqual([]); - }); - - it('starts a retry loop which clears the saved details when no contract exist for the org', async () => { - addMockTransationDetails(redis); - mockContracts = new Map(); - startRetryLoop(mockContracts, redis); - jest.runOnlyPendingTimers(); - await flushPromises(); - - const index = await redis.zrange('index:txn:timestamp', 0, -1); - expect(index).toStrictEqual([]); + await expect(async () => { + await processSubmitTransactionJob(mockContracts, mockJob); + }).rejects.toThrow('MOCK ERROR'); }); }); @@ -352,96 +300,65 @@ describe('Fabric', () => { }).rejects.toThrow(TransactionNotFoundError); }); - it('throws a TransactionError for other errors', async () => { + it('throws an Error for other errors', async () => { mockTransaction.evaluate.mockRejectedValue(new Error('MOCK ERROR')); - await expect(async () => { await evatuateTransaction(mockContract, 'txn', 'arga', 'argb'); - }).rejects.toThrow(TransactionError); + }).rejects.toThrow(Error); }); }); describe('submitTransaction', () => { - let redis: Redis; - const mockPayload = Buffer.from('MOCK PAYLOAD'); let mockTransaction: MockProxy; - let mockContract: MockProxy; - - beforeEach(async () => { - const redisOptions = { - port: config.redisPort, - host: config.redisHost, - username: config.redisUsername, - password: config.redisPassword, - }; - - redis = new IORedis(redisOptions) as unknown as Redis; + beforeEach(() => { mockTransaction = mock(); - mockTransaction.submit.mockResolvedValue(mockPayload); - mockTransaction.getTransactionId.mockReturnValue('MOCK TXN ID'); - mockTransaction.serialize.mockReturnValue(Buffer.from('MOCK TXN STATE')); - mockContract = mock(); - mockContract.createTransaction - .calledWith('txn') - .mockReturnValue(mockTransaction); }); - it('gets the transaction ID of the submitted transaction', async () => { + it('gets the result of submitting a transaction', async () => { + const mockPayload = Buffer.from('MOCK PAYLOAD'); + mockTransaction.submit.mockResolvedValue(mockPayload); + const result = await submitTransaction( - mockContract, - redis, - 'mspid', + mockTransaction, 'txn', 'arga', 'argb' ); - expect(result).toBe('MOCK TXN ID'); + expect(result.toString()).toBe(mockPayload.toString()); }); - it.each([ - 'the asset GOCHAINCODE already exists', - 'Asset JAVACHAINCODE already exists', - 'The asset JSCHAINCODE already exists', - ])( - 'throws an AssetExistsError an asset already exists error occurs: %s', - async (msg) => { - mockTransaction.submit.mockRejectedValue(new Error(msg)); + it('throws an AssetExistsError an asset already exists error occurs', async () => { + mockTransaction.submit.mockRejectedValue( + new Error('The asset JSCHAINCODE already exists') + ); - await expect(async () => { - await submitTransaction( - mockContract, - redis, - 'mspid', - 'txn', - 'arga', - 'argb' - ); - }).rejects.toThrow(AssetExistsError); - } - ); + await expect(async () => { + await submitTransaction( + mockTransaction, + 'mspid', + 'txn', + 'arga', + 'argb' + ); + }).rejects.toThrow(AssetExistsError); + }); - it.each([ - 'the asset GOCHAINCODE does not exist', - 'Asset JAVACHAINCODE does not exist', - 'The asset JSCHAINCODE does not exist', - ])( - 'throws an AssetNotFoundError if an asset does not exist error occurs: %s', - async (msg) => { - mockTransaction.submit.mockRejectedValue(new Error(msg)); + it('throws an AssetNotFoundError if an asset does not exist error occurs', async () => { + mockTransaction.submit.mockRejectedValue( + new Error('The asset JSCHAINCODE does not exist') + ); - await expect(async () => { - await submitTransaction( - mockContract, - redis, - 'mspid', - 'txn', - 'arga', - 'argb' - ); - }).rejects.toThrow(AssetNotFoundError); - } - ); + await expect(async () => { + await submitTransaction( + mockTransaction, + 'mspid', + 'txn', + 'arga', + 'argb' + ); + }).rejects.toThrow(AssetNotFoundError); + }); it('throws a TransactionNotFoundError if a transaction not found error occurs', async () => { mockTransaction.submit.mockRejectedValue( @@ -452,8 +369,7 @@ describe('Fabric', () => { await expect(async () => { await submitTransaction( - mockContract, - redis, + mockTransaction, 'mspid', 'txn', 'arga', @@ -462,76 +378,42 @@ describe('Fabric', () => { }).rejects.toThrow(TransactionNotFoundError); }); - it('throws a TransactionError for other errors', async () => { + it('throws an Error for other errors', async () => { mockTransaction.submit.mockRejectedValue(new Error('MOCK ERROR')); await expect(async () => { await submitTransaction( - mockContract, - redis, + mockTransaction, 'mspid', 'txn', 'arga', 'argb' ); - }).rejects.toThrow(TransactionError); + }).rejects.toThrow(Error); }); }); - describe('blockEventHandler', () => { - let redis: Redis; - let mockIsValidGetter: jest.Mock; - let mockTransactionIdGetter: jest.Mock; - let mockTransactionEvent: MockProxy; - let mockBlockEvent: MockProxy; + describe('getTransactionValidationCode', () => { + it('gets the validation code from a processed transaction', async () => { + const processedTransactionProto = + fabricProtos.protos.ProcessedTransaction.create(); + processedTransactionProto.validationCode = + fabricProtos.protos.TxValidationCode.VALID; + const processedTransactionBuffer = Buffer.from( + fabricProtos.protos.ProcessedTransaction.encode( + processedTransactionProto + ).finish() + ); - beforeEach(async () => { - const redisOptions = { - port: config.redisPort, - host: config.redisHost, - username: config.redisUsername, - password: config.redisPassword, - }; - - redis = new IORedis(redisOptions) as unknown as Redis; - addMockTransationDetails(redis); - - const baseMock = {}; - mockTransactionEvent = mock(baseMock); - mockIsValidGetter = jest.fn(); - Object.defineProperty(baseMock, 'isValid', { get: mockIsValidGetter }); - mockTransactionIdGetter = jest.fn(); - Object.defineProperty(baseMock, 'transactionId', { - get: mockTransactionIdGetter, - }); - - mockBlockEvent = mock(); - mockBlockEvent.getTransactionEvents.mockReturnValue([ - mockTransactionEvent, - ]); - }); - - it('clears saved details for valid transactions', async () => { - const blockListener = blockEventHandler(redis); - mockIsValidGetter.mockReturnValue(true); - mockTransactionIdGetter.mockReturnValue(mockTransactionId); - - await blockListener(mockBlockEvent); - - const index = await redis.zrange('index:txn:timestamp', 0, -1); - expect(index).toStrictEqual([]); - }); - - it('does not clear saved details for invalid transactions', async () => { - const blockListener = blockEventHandler(redis); - mockIsValidGetter.mockReturnValue(false); - - await blockListener(mockBlockEvent); - - const index = await redis.zrange('index:txn:timestamp', 0, -1); - expect(index).toStrictEqual([ - '0ae62c01e4c4b112c3f3954a2f11243da76778e46df9ad2783bcbafc79652b95', - ]); + const mockTransaction = mock(); + mockTransaction.evaluate.mockResolvedValue(processedTransactionBuffer); + const mockContract = mock(); + mockContract.createTransaction + .calledWith('GetTransactionByID') + .mockReturnValue(mockTransaction); + expect(await getTransactionValidationCode(mockContract, 'txn1')).toBe( + 'VALID' + ); }); }); diff --git a/asset-transfer-basic/rest-api-typescript/src/fabric.ts b/asset-transfer-basic/rest-api-typescript/src/fabric.ts index aa8586ea..70a3a6f2 100644 --- a/asset-transfer-basic/rest-api-typescript/src/fabric.ts +++ b/asset-transfer-basic/rest-api-typescript/src/fabric.ts @@ -10,23 +10,20 @@ import { GatewayOptions, Wallets, Network, - BlockListener, - BlockEvent, - TransactionEvent, + TimeoutError, + Transaction, Wallet, } from 'fabric-network'; -import { Redis } from 'ioredis'; import * as config from './config'; import { logger } from './logger'; import { - storeTransactionDetails, - getRetryTransactionDetails, - clearTransactionDetails, - incrementRetryCount, - TransactionDetails, -} from './redis'; -import { handleError, isDuplicateTransactionError } from './errors'; -import protos from 'fabric-protos'; + handleError, + isContractError, + isDuplicateTransactionError, +} from './errors'; +import * as protos from 'fabric-protos'; +import { Job } from 'bullmq'; +import { JobData, JobResult, updateJobData } from './jobs'; /* * Creates an in memory wallet to hold credentials for an Org1 and Org2 user @@ -124,55 +121,119 @@ export const getContracts = async ( }; /* - * Starts a timer to retry transactions at regular intervals + * Process a submit transaction request from the job queue * - * Note: there is check for whether the transaction has successfully completed - * since it could succeed between any check and the retry, so the additional - * transaction to get the status is unlikely to be worthwhile + * For this sample transactions are retried if they fail with any error, + * except for errors from the smart contract, or duplicate transaction + * errors + * + * You might decide to retry transactions which fail with specific errors + * instead, for example: + * MVCC_READ_CONFLICT + * PHANTOM_READ_CONFLICT + * ENDORSEMENT_POLICY_FAILURE + * CHAINCODE_VERSION_CONFLICT + * EXPIRED_CHAINCODE */ -export const startRetryLoop = ( +export const processSubmitTransactionJob = async ( contracts: Map, - redis: Redis -): void => { - const retryInterval = setInterval( - async (contracts, redis) => { - if (logger.isLevelEnabled('debug')) { - try { - const pendingTransactionCount = await (redis as Redis).zcard( - 'index:txn:timestamp' - ); - logger.debug( - '%d transactions awaiting retry', - pendingTransactionCount - ); - } catch (err) { - logger.warn({ err }, 'Error getting pending transaction count'); - } + job: Job +): Promise => { + logger.debug({ jobId: job.id, jobName: job.name }, 'Processing job'); + + const contract = contracts.get(job.data.mspid); + if (contract === undefined) { + logger.error( + { jobId: job.id, jobName: job.name }, + 'Contract not found for MSP ID %s', + job.data.mspid + ); + + // Retrying will not work, so give up with an unsuccessful result + return { + transactionError: undefined, + transactionPayload: undefined, + }; + } + + let transaction: Transaction; + if (job.data.transactionState) { + const savedState = job.data.transactionState; + logger.debug( + { + jobId: job.id, + jobName: job.name, + savedState, + }, + 'Using previously saved transaction state' + ); + + transaction = contract.deserializeTransaction(savedState); + } else { + logger.debug( + { + jobId: job.id, + jobName: job.name, + }, + 'Using new transaction' + ); + + transaction = contract.createTransaction(job.data.transactionName); + await updateJobData(job, transaction); + } + + try { + logger.debug( + { + jobId: job.id, + jobName: job.name, + transactionId: transaction.getTransactionId(), + }, + 'Submitting transaction' + ); + const args = job.data.transactionArgs; + const payload = await submitTransaction(transaction, ...args); + + return { + transactionError: undefined, + transactionPayload: payload, + }; + } catch (err) { + if ( + err instanceof Error && + (isContractError(err) || isDuplicateTransactionError(err)) + ) { + logger.error( + { jobId: job.id, jobName: job.name, err }, + 'Fatal transaction error occurred' + ); + + // Return a job result to stop retrying + return { + transactionError: err.toString(), + transactionPayload: undefined, + }; + } else { + logger.warn( + { jobId: job.id, jobName: job.name, err }, + 'Retryable transaction error occurred' + ); + + // The original transaction may eventually get committed in the case of + // a timeout error, so keep the same transaction ID to protect against + // unintended duplicate transactions + if (!(err instanceof TimeoutError)) { + logger.debug( + { jobId: job.id, jobName: job.name }, + 'Clearing saved transaction state' + ); + await updateJobData(job, undefined); } - const savedTransaction = await getRetryTransactionDetails(redis); - - if (savedTransaction) { - const contract = contracts.get(savedTransaction.mspId); - - if (contract) { - await retryTransaction(contract, redis, savedTransaction); - } else { - clearTransactionDetails(redis, savedTransaction.transactionId); - logger.error( - 'No contract found for %s to retry transaction %s', - savedTransaction.mspId, - savedTransaction.transactionId - ); - } - } - }, - config.retryDelay, - contracts, - redis - ); - - retryInterval.unref(); + // Rethrow the error to keep retrying + throw err; + } + } }; /* @@ -183,146 +244,64 @@ export const evatuateTransaction = async ( transactionName: string, ...transactionArgs: string[] ): Promise => { - const txn = contract.createTransaction(transactionName); - const txnId = txn.getTransactionId(); + const transaction = contract.createTransaction(transactionName); + const transactionId = transaction.getTransactionId(); + logger.trace({ transaction }, 'Evaluating transaction'); try { - const payload = await txn.evaluate(...transactionArgs); - logger.debug( - { transactionId: txnId, payload: payload.toString() }, + const payload = await transaction.evaluate(...transactionArgs); + logger.trace( + { transactionId: transactionId, payload: payload.toString() }, 'Evaluate transaction response received' ); return payload; + } catch (err) { + throw handleError(transactionId, err); + } +}; + +/* + * Submit a transaction and handle any errors + */ +export const submitTransaction = async ( + transaction: Transaction, + ...transactionArgs: string[] +): Promise => { + logger.trace({ transaction }, 'Submitting transaction'); + const txnId = transaction.getTransactionId(); + + try { + const payload = await transaction.submit(...transactionArgs); + logger.trace( + { transactionId: txnId, payload: payload.toString() }, + 'Submit transaction response received' + ); + return payload; } catch (err) { throw handleError(txnId, err); } }; /* - * Submit a transaction and handle any errors - * - * Transaction details are saved before being submitted so that they can be - * retried if any errors occur + * Get the validation code of the specified transaction */ -export const submitTransaction = async ( - contract: Contract, - redis: Redis, - mspId: string, - transactionName: string, - ...transactionArgs: string[] +export const getTransactionValidationCode = async ( + qsccContract: Contract, + transactionId: string ): Promise => { - const txn = contract.createTransaction(transactionName); - const txnId = txn.getTransactionId(); - const txnState = txn.serialize(); - const txnArgs = JSON.stringify(transactionArgs); - const timestamp = Date.now(); + const data = await evatuateTransaction( + qsccContract, + 'GetTransactionByID', + config.channelName, + transactionId + ); - try { - // Store the transaction details and set the event handler in case there - // are problems later with commiting the transaction - await storeTransactionDetails( - redis, - txnId, - mspId, - txnState, - txnArgs, - timestamp - ); - txn.setEventHandler(DefaultEventHandlerStrategies.NONE); - await txn.submit(...transactionArgs); - } catch (err) { - // If the transaction failed to endorse, there is no point attempting - // to retry it later so clear the transaction details - // TODO will this always catch endorsement errors or can they - // arrive later? - await clearTransactionDetails(redis, txnId); - throw handleError(txnId, err); - } + const processedTransaction = protos.protos.ProcessedTransaction.decode(data); + const validationCode = + protos.protos.TxValidationCode[processedTransaction.validationCode]; - return txnId; -}; - -/* - * Retry a transaction - * - * The saved transaction details include a retry count which is used to ensure - * failing transactions are not retried indefinitely - */ -const retryTransaction = async ( - contract: Contract, - redis: Redis, - savedTransaction: TransactionDetails -): Promise => { - logger.debug('Retrying transaction %s', savedTransaction.transactionId); - - try { - const transaction = contract.deserializeTransaction( - savedTransaction.transactionState - ); - const args: string[] = JSON.parse(savedTransaction.transactionArgs); - - const payload = await transaction.submit(...args); - logger.debug( - { - transactionId: savedTransaction.transactionId, - payload: payload.toString(), - }, - 'Retry transaction response received' - ); - - await clearTransactionDetails(redis, savedTransaction.transactionId); - } catch (err) { - if (isDuplicateTransactionError(err)) { - logger.warn( - 'Transaction %s has already been committed', - savedTransaction.transactionId - ); - await clearTransactionDetails(redis, savedTransaction.transactionId); - } else { - logger.warn( - err, - 'Retry %d failed for transaction %s', - savedTransaction.retries, - savedTransaction.transactionId - ); - - if (savedTransaction.retries < config.maxRetryCount) { - await incrementRetryCount(redis, savedTransaction.transactionId); - } else { - await clearTransactionDetails(redis, savedTransaction.transactionId); - } - } - } -}; - -/* - * Block event listener to handle successful transactions - * - * Transaction details are saved before being submitted so that they can be - * retried, and this listener deletes those transaction details for any - * successful transactions - * - * Transactions can be submitted using one of two identities however one one - * of those identities is used to listen for block events - */ -export const blockEventHandler = (redis: Redis): BlockListener => { - const blockListener = async (event: BlockEvent) => { - logger.debug( - { blockNumber: event.blockNumber.toString() }, - 'Block event received' - ); - const transactionEvents: Array = - event.getTransactionEvents(); - - for (const event of transactionEvents) { - if (event && event.isValid) { - logger.debug('Remove transation with txnId %s', event.transactionId); - await clearTransactionDetails(redis, event.transactionId); - } - } - }; - - return blockListener; + logger.debug({ transactionId }, 'Validation code: %s', validationCode); + return validationCode; }; /* @@ -340,6 +319,7 @@ export const getBlockHeight = async ( ); const info = protos.common.BlockchainInfo.decode(data); const blockHeight = info.height; + logger.debug('Current block height: %d', blockHeight); return blockHeight; }; diff --git a/asset-transfer-basic/rest-api-typescript/src/health.router.ts b/asset-transfer-basic/rest-api-typescript/src/health.router.ts index 27b40c3d..463ee0e4 100644 --- a/asset-transfer-basic/rest-api-typescript/src/health.router.ts +++ b/asset-transfer-basic/rest-api-typescript/src/health.router.ts @@ -8,6 +8,8 @@ import { getReasonPhrase, StatusCodes } from 'http-status-codes'; import { getBlockHeight } from './fabric'; import { logger } from './logger'; import * as config from './config'; +import { Queue } from 'bullmq'; +import { getJobCounts } from './jobs'; const { SERVICE_UNAVAILABLE, OK } = StatusCodes; @@ -27,21 +29,26 @@ healthRouter.get('/ready', (_req, res: Response) => healthRouter.get('/live', async (req: Request, res: Response) => { logger.debug(req.body, 'Liveness request received'); - const qsccOrg1 = req.app.get(config.mspIdOrg1).qsccContract as Contract; - const qsccOrg2 = req.app.get(config.mspIdOrg2).qsccContract as Contract; - try { - await Promise.all([getBlockHeight(qsccOrg1), getBlockHeight(qsccOrg2)]); - } catch (err) { - logger.error(err, 'Error processing liveness request'); + const submitQueue = req.app.get('jobq') as Queue; + const qsccOrg1 = req.app.get(config.mspIdOrg1).qsccContract as Contract; + const qsccOrg2 = req.app.get(config.mspIdOrg2).qsccContract as Contract; - res.status(SERVICE_UNAVAILABLE).json({ + await Promise.all([ + getBlockHeight(qsccOrg1), + getBlockHeight(qsccOrg2), + getJobCounts(submitQueue), + ]); + } catch (err) { + logger.error({ err }, 'Error processing liveness request'); + + return res.status(SERVICE_UNAVAILABLE).json({ status: getReasonPhrase(SERVICE_UNAVAILABLE), timestamp: new Date().toISOString(), }); } - res.status(OK).json({ + return res.status(OK).json({ status: getReasonPhrase(OK), timestamp: new Date().toISOString(), }); diff --git a/asset-transfer-basic/rest-api-typescript/src/index.ts b/asset-transfer-basic/rest-api-typescript/src/index.ts index 98f91ebc..567d3cbc 100644 --- a/asset-transfer-basic/rest-api-typescript/src/index.ts +++ b/asset-transfer-basic/rest-api-typescript/src/index.ts @@ -2,19 +2,93 @@ * SPDX-License-Identifier: Apache-2.0 */ +import { Contract } from 'fabric-network'; import * as config from './config'; +import { + createGateway, + createWallet, + getContracts, + getNetwork, +} from './fabric'; +import { + initJobQueue, + initJobQueueScheduler, + initJobQueueWorker, +} from './jobs'; import { logger } from './logger'; import { createServer } from './server'; +import { isMaxmemoryPolicyNoeviction } from './redis'; +import { Queue, QueueScheduler, Worker } from 'bullmq'; + +let jobQueue: Queue | undefined; +let jobQueueWorker: Worker | undefined; +let jobQueueScheduler: QueueScheduler | undefined; async function main() { + logger.info('Checking Redis config'); + if (!(await isMaxmemoryPolicyNoeviction())) { + throw new Error( + 'Invalid redis configuration: redis instance must have the setting maxmemory-policy=noeviction' + ); + } + + logger.info('Connecting to Fabric network'); + const wallet = await createWallet(); + + const gatewayOrg1 = await createGateway( + config.connectionProfileOrg1, + config.mspIdOrg1, + wallet + ); + const networkOrg1 = await getNetwork(gatewayOrg1); + const contractsOrg1 = await getContracts(networkOrg1); + + const gatewayOrg2 = await createGateway( + config.connectionProfileOrg2, + config.mspIdOrg2, + wallet + ); + const networkOrg2 = await getNetwork(gatewayOrg2); + const contractsOrg2 = await getContracts(networkOrg2); + + const assetContracts = new Map(); + assetContracts.set(config.mspIdOrg1, contractsOrg1.assetContract); + assetContracts.set(config.mspIdOrg2, contractsOrg2.assetContract); + + logger.info('Initialising submit job queue'); + jobQueue = initJobQueue(); + jobQueueWorker = initJobQueueWorker(assetContracts); + if (config.submitJobQueueScheduler === true) { + logger.info('Initialising submit job queue scheduler'); + jobQueueScheduler = initJobQueueScheduler(); + } + + logger.info('Creating REST server'); const app = await createServer(); + app.set(config.mspIdOrg1, contractsOrg1); + app.set(config.mspIdOrg2, contractsOrg2); + app.set('jobq', jobQueue); app.listen(config.port, () => { - logger.info('Express server started on port: %d', config.port); + logger.info('REST server started on port: %d', config.port); }); } -// TODO handle errors! E.g. try starting with the wrong cert and private key! -main().catch((err) => { - logger.error(err, 'Unxepected error'); +main().catch(async (err) => { + logger.error({ err }, 'Unxepected error'); + + if (jobQueueScheduler != undefined) { + logger.debug('Closing job queue scheduler'); + await jobQueueScheduler.close(); + } + + if (jobQueueWorker != undefined) { + logger.debug('Closing job queue worker'); + await jobQueueWorker.close(); + } + + if (jobQueue != undefined) { + logger.debug('Closing job queue'); + await jobQueue.close(); + } }); diff --git a/asset-transfer-basic/rest-api-typescript/src/jobs.router.ts b/asset-transfer-basic/rest-api-typescript/src/jobs.router.ts new file mode 100644 index 00000000..097d19a7 --- /dev/null +++ b/asset-transfer-basic/rest-api-typescript/src/jobs.router.ts @@ -0,0 +1,41 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + */ + +import { Queue } from 'bullmq'; +import express, { Request, Response } from 'express'; +import { getReasonPhrase, StatusCodes } from 'http-status-codes'; +import { JobNotFoundError } from './errors'; +import { getJobSummary } from './jobs'; +import { logger } from './logger'; + +const { INTERNAL_SERVER_ERROR, NOT_FOUND, OK } = StatusCodes; + +export const jobsRouter = express.Router(); + +jobsRouter.get('/:jobId', async (req: Request, res: Response) => { + const jobId = req.params.jobId; + logger.debug('Read request received for job ID %s', jobId); + + try { + const submitQueue = req.app.get('jobq') as Queue; + + const jobSummary = await getJobSummary(submitQueue, jobId); + + return res.status(OK).json(jobSummary); + } catch (err) { + logger.error({ err }, 'Error processing read request for job ID %s', jobId); + + if (err instanceof JobNotFoundError) { + return res.status(NOT_FOUND).json({ + status: getReasonPhrase(NOT_FOUND), + timestamp: new Date().toISOString(), + }); + } + + return res.status(INTERNAL_SERVER_ERROR).json({ + status: getReasonPhrase(INTERNAL_SERVER_ERROR), + timestamp: new Date().toISOString(), + }); + } +}); diff --git a/asset-transfer-basic/rest-api-typescript/src/jobs.spec.ts b/asset-transfer-basic/rest-api-typescript/src/jobs.spec.ts new file mode 100644 index 00000000..05fe6af0 --- /dev/null +++ b/asset-transfer-basic/rest-api-typescript/src/jobs.spec.ts @@ -0,0 +1,155 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + */ + +import { Job, Queue } from 'bullmq'; +import { getJobCounts, getJobSummary } from './jobs'; +import { mock, MockProxy } from 'jest-mock-extended'; +import { JobNotFoundError } from './errors'; + +describe('initJobQueue', () => { + it.todo('write tests'); +}); + +describe('initJobQueueWorker', () => { + it.todo('write tests'); +}); + +describe('initJobQueueScheduler', () => { + it.todo('write tests'); +}); + +describe('addSubmitTransactionJob', () => { + it.todo('write tests'); +}); + +describe('getJobSummary', () => { + let mockQueue: MockProxy; + let mockJob: MockProxy; + + beforeEach(() => { + mockQueue = mock(); + mockJob = mock(); + }); + + it('throws a JobNotFoundError if the Job is undefined', async () => { + mockQueue.getJob.calledWith('1').mockResolvedValue(undefined); + + await expect(async () => { + await getJobSummary(mockQueue, '1'); + }).rejects.toThrow(JobNotFoundError); + }); + + it('gets a job summary with transaction payload data', async () => { + mockQueue.getJob.calledWith('1').mockResolvedValue(mockJob); + mockJob.id = '1'; + mockJob.data = { + transactionIds: ['txn1'], + }; + mockJob.returnvalue = { + transactionPayload: Buffer.from('MOCK PAYLOAD'), + }; + + expect(await getJobSummary(mockQueue, '1')).toStrictEqual({ + jobId: '1', + transactionIds: ['txn1'], + transactionError: undefined, + transactionPayload: 'MOCK PAYLOAD', + }); + }); + + it('gets a job summary with empty transaction payload data', async () => { + mockQueue.getJob.calledWith('1').mockResolvedValue(mockJob); + mockJob.id = '1'; + mockJob.data = { + transactionIds: ['txn1'], + }; + mockJob.returnvalue = { + transactionPayload: Buffer.from(''), + }; + + expect(await getJobSummary(mockQueue, '1')).toStrictEqual({ + jobId: '1', + transactionIds: ['txn1'], + transactionError: undefined, + transactionPayload: '', + }); + }); + + it('gets a job summary with a transaction error', async () => { + mockQueue.getJob.calledWith('1').mockResolvedValue(mockJob); + mockJob.id = '1'; + mockJob.data = { + transactionIds: ['txn1'], + }; + mockJob.returnvalue = { + transactionError: 'MOCK ERROR', + }; + + expect(await getJobSummary(mockQueue, '1')).toStrictEqual({ + jobId: '1', + transactionIds: ['txn1'], + transactionError: 'MOCK ERROR', + transactionPayload: '', + }); + }); + + it('gets a job summary when there is no return value', async () => { + mockQueue.getJob.calledWith('1').mockResolvedValue(mockJob); + mockJob.id = '1'; + mockJob.returnvalue = undefined; + mockJob.data = { + transactionIds: ['txn1'], + }; + + expect(await getJobSummary(mockQueue, '1')).toStrictEqual({ + jobId: '1', + transactionIds: ['txn1'], + transactionError: undefined, + transactionPayload: undefined, + }); + }); + + it('gets a job summary when there is no job data', async () => { + mockQueue.getJob.calledWith('1').mockResolvedValue(mockJob); + mockJob.id = '1'; + mockJob.data = undefined; + mockJob.returnvalue = { + transactionPayload: Buffer.from('MOCK PAYLOAD'), + }; + + expect(await getJobSummary(mockQueue, '1')).toStrictEqual({ + jobId: '1', + transactionIds: [], + transactionError: undefined, + transactionPayload: 'MOCK PAYLOAD', + }); + }); +}); + +describe('updateSubmitTransactionJobStateData', () => { + it.todo('write tests'); +}); + +describe('getJobCounts', () => { + it('gets job counts from the specified queue', async () => { + const mockQueue = mock(); + mockQueue.getJobCounts + .calledWith('active', 'completed', 'delayed', 'failed', 'waiting') + .mockResolvedValue({ + active: 1, + completed: 2, + delayed: 3, + failed: 4, + waiting: 5, + }); + + expect(await getJobCounts(mockQueue)).toStrictEqual({ + active: 1, + completed: 2, + delayed: 3, + failed: 4, + waiting: 5, + }); + }); +}); diff --git a/asset-transfer-basic/rest-api-typescript/src/jobs.ts b/asset-transfer-basic/rest-api-typescript/src/jobs.ts new file mode 100644 index 00000000..da397a2d --- /dev/null +++ b/asset-transfer-basic/rest-api-typescript/src/jobs.ts @@ -0,0 +1,216 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * This sample uses BullMQ jobs to process submit transactions, which includes + * retry support for failing jobs + * + * Important: BullMQ requires the following setting in redis + * maxmemory-policy=noeviction + * For details, see: https://docs.bullmq.io/guide/connections + */ + +import { ConnectionOptions, Job, Queue, QueueScheduler, Worker } from 'bullmq'; +import { Contract, Transaction } from 'fabric-network'; +import * as config from './config'; +import { JobNotFoundError } from './errors'; +import { processSubmitTransactionJob } from './fabric'; +import { logger } from './logger'; + +export type JobData = { + mspid: string; + transactionName: string; + transactionArgs: string[]; + transactionState?: Buffer; + transactionIds: string[]; +}; + +export type JobResult = { + transactionPayload?: Buffer; + transactionError?: string; +}; + +// TODO include attempts made? +export type JobSummary = { + jobId: string; + transactionIds: string[]; + transactionPayload?: string; + transactionError?: string; +}; + +const connection: ConnectionOptions = { + port: config.redisPort, + host: config.redisHost, + username: config.redisUsername, + password: config.redisPassword, +}; + +export const initJobQueue = (): Queue => { + const submitQueue = new Queue(config.JOB_QUEUE_NAME, { + connection, + defaultJobOptions: { + attempts: config.submitJobAttempts, + backoff: { + type: config.submitJobBackoffType, + delay: config.submitJobBackoffDelay, + }, + removeOnComplete: config.maxCompletedSubmitJobs, + removeOnFail: config.maxFailedSubmitJobs, + }, + }); + + return submitQueue; +}; + +export const initJobQueueWorker = ( + contracts: Map +): Worker => { + const worker = new Worker( + config.JOB_QUEUE_NAME, + async (job): Promise => { + return await processSubmitTransactionJob(contracts, job); + }, + { connection, concurrency: config.submitJobConcurrency } + ); + + worker.on('failed', (job) => { + logger.error({ job }, 'Job failed'); // WHY?! + }); + + // Important: need to handle this error otherwise worker may stop + // processing jobs + worker.on('error', (err) => { + logger.error({ err }, 'Worker error'); + }); + + if (logger.isLevelEnabled('debug')) { + worker.on('completed', (job) => { + logger.debug({ job }, 'Job completed'); + }); + } + + return worker; +}; + +export const initJobQueueScheduler = (): QueueScheduler => { + const queueScheduler = new QueueScheduler(config.JOB_QUEUE_NAME, { + connection, + }); + + queueScheduler.on('failed', (jobId, failedReason) => { + // TODO when does this happen, and how should it be handled? + logger.error({ jobId, failedReason }, 'Queue sceduler failure'); + }); + + return queueScheduler; +}; + +export const addSubmitTransactionJob = async ( + submitQueue: Queue, + mspid: string, + transactionName: string, + ...transactionArgs: string[] +): Promise => { + const jobName = `submit ${transactionName} transaction`; + const job = await submitQueue.add(jobName, { + mspid, + transactionName, + transactionArgs: transactionArgs, + transactionIds: [], + }); + + if (job?.id === undefined) { + throw new Error('Submit transaction job ID not available'); + } + + return job.id; +}; + +/* + * Gets a summary for the jobs endpoint + */ +export const getJobSummary = async ( + queue: Queue, + jobId: string +): Promise => { + const job: Job | undefined = await queue.getJob(jobId); + logger.debug({ job }, 'Got job'); + + if (!(job && job.id != undefined)) { + throw new JobNotFoundError(`Job ${jobId} not found`, jobId); + } + + let transactionIds: string[]; + if (job.data && job.data.transactionIds) { + transactionIds = job.data.transactionIds; + } else { + transactionIds = []; + } + + let transactionError; + let transactionPayload; + const returnValue = job.returnvalue; + if (returnValue) { + if (returnValue.transactionError) { + transactionError = returnValue.transactionError; + } + + if ( + returnValue.transactionPayload && + returnValue.transactionPayload.length > 0 + ) { + transactionPayload = returnValue.transactionPayload.toString(); + } else { + transactionPayload = ''; + } + } + + const jobSummary: JobSummary = { + jobId: job.id, + transactionIds, + transactionError, + transactionPayload, + }; + + return jobSummary; +}; + +export const updateJobData = async ( + job: Job, + transaction: Transaction | undefined +): Promise => { + const newData = { ...job.data }; + + if (transaction != undefined) { + const transationIds = ([] as string[]).concat( + newData.transactionIds, + transaction.getTransactionId() + ); + newData.transactionIds = transationIds; + + newData.transactionState = transaction.serialize(); + } else { + newData.transactionState = undefined; + } + + await job.update(newData); +}; + +/* + * Get the current job counts + * + * This function is used for the liveness REST endpoint + */ +export const getJobCounts = async ( + queue: Queue +): Promise<{ [index: string]: number }> => { + const jobCounts = await queue.getJobCounts( + 'active', + 'completed', + 'delayed', + 'failed', + 'waiting' + ); + logger.debug({ jobCounts }, 'Current job counts'); + + return jobCounts; +}; diff --git a/asset-transfer-basic/rest-api-typescript/src/redis.spec.ts b/asset-transfer-basic/rest-api-typescript/src/redis.spec.ts index 8b4f291c..a73f3b8c 100644 --- a/asset-transfer-basic/rest-api-typescript/src/redis.spec.ts +++ b/asset-transfer-basic/rest-api-typescript/src/redis.spec.ts @@ -2,183 +2,33 @@ * SPDX-License-Identifier: Apache-2.0 */ -import * as config from './config'; -import IORedis, { Redis } from 'ioredis'; -import { - clearTransactionDetails, - incrementRetryCount, - storeTransactionDetails, - getTransactionDetails, - getRetryTransactionDetails, -} from './redis'; +import { isMaxmemoryPolicyNoeviction } from './redis'; -jest.mock('ioredis', () => require('ioredis-mock/jest')); +const mockRedisConfig = jest.fn(); +jest.mock('ioredis', () => { + return jest.fn().mockImplementation(() => { + return { + config: mockRedisConfig, + disconnect: jest.fn(), + }; + }); +}); jest.mock('./config'); describe('Redis', () => { - let redis: Redis; - - const mockTransactionId = - '0ae62c01e4c4b112c3f3954a2f11243da76778e46df9ad2783bcbafc79652b95'; - const mockKey = `txn:${mockTransactionId}`; - const mockMspId = 'Org1MSP'; - const mockState = Buffer.from( - `{"name":"CreateAsset","nonce":"damqinq8nrI4n4qY8lFVsZw7RwG2ufrv","transactionId":${mockTransactionId}` - ); - const mockArgs = '["test111","red",400,"Jean",101]'; - const mockTimestamp = 1628078044362; - - const addMockTransationDetails = async (redis: Redis) => { - await redis - .multi() - .hset( - mockKey, - 'mspId', - mockMspId, - 'state', - mockState, - 'args', - mockArgs, - 'timestamp', - mockTimestamp, - 'retries', - '0' - ) - .zadd('index:txn:timestamp', mockTimestamp, mockTransactionId) - .exec(); - }; - - beforeEach(async () => { - const redisOptions = { - port: config.redisPort, - host: config.redisHost, - username: config.redisUsername, - password: config.redisPassword, - }; - - redis = new IORedis(redisOptions) as unknown as Redis; - }); - describe('storeTransactionDetails', () => { - it('stores transaction details as a hash', async () => { - await storeTransactionDetails( - redis, - mockTransactionId, - mockMspId, - mockState, - mockArgs, - mockTimestamp - ); - - const storedTransaction = await redis.hgetall(mockKey); - const expectedTransaction = { - mspId: mockMspId, - state: mockState, - args: mockArgs, - retries: '0', - timestamp: mockTimestamp.toString(), - }; - expect(storedTransaction).toStrictEqual(expectedTransaction); - }); - - it('adds the transaction ID to the sorted set timestamp index', async () => { - await storeTransactionDetails( - redis, - mockTransactionId, - mockMspId, - mockState, - mockArgs, - mockTimestamp - ); - - const index = await redis.zrange('index:txn:timestamp', 0, -1); - expect(index).toStrictEqual([mockTransactionId]); - }); - - // TODO this seems to work for spying/mocking... - // jest.spyOn(redis, 'multi').mock... - // but haven't worked out how to spy on the hset, zadd, exec in that chain - // Ask Mark? - it.todo('handles an error from redis'); + beforeEach(() => { + mockRedisConfig.mockClear(); }); - describe('getTransactionDetails', () => { - it('gets the transaction details from a hash', async () => { - await addMockTransationDetails(redis); - - const details = await getTransactionDetails(redis, mockTransactionId); - - expect(details).toStrictEqual({ - transactionId: mockTransactionId, - mspId: mockMspId, - transactionState: mockState, - transactionArgs: mockArgs, - retries: 0, - timestamp: mockTimestamp, - }); + describe('isMaxmemoryPolicyNoeviction', () => { + it('returns true when the maxmemory-policy is noeviction', async () => { + mockRedisConfig.mockReturnValue(['maxmemory-policy', 'noeviction']); + expect(await isMaxmemoryPolicyNoeviction()).toBe(true); }); - it.todo('handles an error from redis'); - }); - - describe('getRetryTransactionDetails', () => { - it('gets the oldest transaction details from a hash', async () => { - await addMockTransationDetails(redis); - - const details = await getRetryTransactionDetails(redis); - - expect(details).toStrictEqual({ - transactionId: mockTransactionId, - mspId: mockMspId, - transactionState: mockState, - transactionArgs: mockArgs, - retries: 0, - timestamp: mockTimestamp, - }); + it('returns false when the maxmemory-policy is not noeviction', async () => { + mockRedisConfig.mockReturnValue(['maxmemory-policy', 'allkeys-lru']); + expect(await isMaxmemoryPolicyNoeviction()).toBe(false); }); - - it('gets undefined if there are no transactions to retry', async () => { - const details = await getRetryTransactionDetails(redis); - - expect(details).toBeUndefined(); - }); - - it.todo('handles an error from redis'); - }); - - describe('clearTransactionDetails', () => { - it('removes the transaction details hash', async () => { - await addMockTransationDetails(redis); - - await clearTransactionDetails(redis, mockTransactionId); - - const storedTransaction = await redis.hgetall(mockKey); - expect(storedTransaction).not.toHaveProperty('state'); - }); - - it('removes the transaction ID from the sorted set timestamp index', async () => { - await addMockTransationDetails(redis); - - await clearTransactionDetails(redis, mockTransactionId); - - const index = await redis.zrange('index:txn:timestamp', 0, -1); - expect(index).toStrictEqual([]); - }); - }); - - describe('incrementRetryCount', () => { - it('increments the retries value in the transction details hash', async () => { - await addMockTransationDetails(redis); - - await incrementRetryCount(redis, mockTransactionId); - - const retries = await redis.hget(mockKey, 'retries'); - expect(retries).toBe('1'); - }); - - it.todo( - 'updates the position of the transaction ID in the sorted set timestamp index' - ); - - it.todo('handles an error from redis'); }); }); diff --git a/asset-transfer-basic/rest-api-typescript/src/redis.ts b/asset-transfer-basic/rest-api-typescript/src/redis.ts index fc89fa9b..bb4246da 100644 --- a/asset-transfer-basic/rest-api-typescript/src/redis.ts +++ b/asset-transfer-basic/rest-api-typescript/src/redis.ts @@ -1,12 +1,7 @@ /* * SPDX-License-Identifier: Apache-2.0 * - * This sample includes basic retry logic so it needs somewhere to store - * transaction details in case the app restarts for any reason, and Redis is - * just one of the options available - * - * Note: This implementation is not designed with multiple instances of the - * REST app in mind, which is likely to be required in a production environment + * TBC */ import IORedis, { Redis, RedisOptions } from 'ioredis'; @@ -14,184 +9,43 @@ import IORedis, { Redis, RedisOptions } from 'ioredis'; import * as config from './config'; import { logger } from './logger'; -const redisOptions: RedisOptions = { - port: config.redisPort, - host: config.redisHost, - username: config.redisUsername, - password: config.redisPassword, -}; - -export const redis = new IORedis(redisOptions); - -export type TransactionDetails = { - transactionId: string; - mspId: string; - transactionState: Buffer; - transactionArgs: string; - timestamp: number; - retries: number; -}; - /* - * Store enough information in order to resubmit a transaction + * Check whether the maxmemory-policy config is set to noeviction + * + * BullMQ requires this setting in redis + * For details, see: https://docs.bullmq.io/guide/connections */ -export const storeTransactionDetails = async ( - redis: Redis, - transactionId: string, - mspId: string, - transactionState: Buffer, - transactionArgs: string, - timestamp: number -): Promise => { +export const isMaxmemoryPolicyNoeviction = async (): Promise => { + let redis: Redis | undefined; + + const redisOptions: RedisOptions = { + port: config.redisPort, + host: config.redisHost, + username: config.redisUsername, + password: config.redisPassword, + }; + try { - const key = `txn:${transactionId}`; - logger.debug( - { - key, - mspId, - transactionState, - transactionArgs, - timestamp, - }, - 'Storing transaction details' - ); - await redis - .multi() - .hset( - key, - 'mspId', - mspId, - 'state', - transactionState, - 'args', - transactionArgs, - 'timestamp', - timestamp, - 'retries', - '0' - ) - .zadd('index:txn:timestamp', timestamp, transactionId) - .exec(); - } catch (err) { - // TODO just log?! - logger.error( - { err }, - 'Error storing details for transaction ID %s', - transactionId - ); - } -}; + redis = new IORedis(redisOptions); -/* - * Get the information required to resubmit a transaction - */ -export const getTransactionDetails = async ( - redis: Redis, - transactionId: string -): Promise => { - try { - const savedTransaction = await (redis as Redis).hgetall( - `txn:${transactionId}` - ); - logger.debug( - { transactionId: transactionId, state: savedTransaction }, - 'Got transaction details' + const maxmemoryPolicyConfig = await (redis as Redis).config( + 'GET', + 'maxmemory-policy' ); + logger.debug({ maxmemoryPolicyConfig }, 'Got maxmemory-policy config'); - const transactionDetails = { - transactionId: transactionId, - mspId: savedTransaction.mspId, - transactionState: Buffer.from(savedTransaction.state), - transactionArgs: savedTransaction.args, - timestamp: parseInt(savedTransaction.timestamp), - retries: parseInt(savedTransaction.retries), - }; - return transactionDetails; - } catch (err) { - // TODO just log?! - logger.error( - { err }, - 'Error getting details for transaction ID %s', - transactionId - ); - } -}; - -/* - * Get the oldest transaction details - */ -export const getRetryTransactionDetails = async ( - redis: Redis -): Promise => { - try { - const transactionIds = await (redis as Redis).zrange( - 'index:txn:timestamp', - -1, - -1 - ); - - if (transactionIds.length > 0) { - const transactionId = transactionIds[0]; - - const savedTransaction = await getTransactionDetails( - redis, - transactionId - ); - return savedTransaction; + if ( + maxmemoryPolicyConfig.length == 2 && + 'maxmemory-policy' === maxmemoryPolicyConfig[0] && + 'noeviction' === maxmemoryPolicyConfig[1] + ) { + return true; + } + } finally { + if (redis != undefined) { + redis.disconnect(); } - } catch (err) { - // TODO just log?! - logger.error( - { err }, - 'Error getting details for next transaction to retry' - ); } -}; -/* - * Delete transaction details - */ -export const clearTransactionDetails = async ( - redis: Redis, - transactionId: string -): Promise => { - const key = `txn:${transactionId}`; - logger.debug('Removing transaction details. Key: %s', key); - try { - await redis - .multi() - .del(key) - .zrem('index:txn:timestamp', transactionId) - .exec(); - } catch (err) { - // TODO just log?! - logger.error( - { err }, - 'Error remove details for transaction ID %s', - transactionId - ); - } -}; - -/* - * Increment the number of times the transaction has been retried - - * TODO needs to update the timestamp and index as well - */ -export const incrementRetryCount = async ( - redis: Redis, - transactionId: string -): Promise => { - const key = `txn:${transactionId}`; - logger.debug('Incrementing retries fortransaction Key: %s', key); - try { - await (redis as Redis).hincrby(`txn:${transactionId}`, 'retries', 1); - } catch (err) { - // TODO just log?! - logger.error( - err, - 'Error incrementing retries for transaction ID %s', - transactionId - ); - } + return false; }; diff --git a/asset-transfer-basic/rest-api-typescript/src/server.ts b/asset-transfer-basic/rest-api-typescript/src/server.ts index e1f360aa..60ceb481 100644 --- a/asset-transfer-basic/rest-api-typescript/src/server.ts +++ b/asset-transfer-basic/rest-api-typescript/src/server.ts @@ -2,30 +2,19 @@ * SPDX-License-Identifier: Apache-2.0 */ -import helmet from 'helmet'; -import { StatusCodes, getReasonPhrase } from 'http-status-codes'; import express, { Application, NextFunction, Request, Response } from 'express'; -import pinoMiddleware from 'pino-http'; -import { Contract } from 'fabric-network'; - -import { logger } from './logger'; -import { assetsRouter } from './assets.router'; -import { healthRouter } from './health.router'; -import { transactionsRouter } from './transactions.router'; -import { - getContracts, - getNetwork, - createGateway, - createWallet, - startRetryLoop, - blockEventHandler, -} from './fabric'; -import { redis } from './redis'; -import * as config from './config'; -const { BAD_REQUEST, INTERNAL_SERVER_ERROR, NOT_FOUND } = StatusCodes; - -import { authenticateApiKey, fabricAPIKeyStrategy } from './auth'; +import helmet from 'helmet'; +import { getReasonPhrase, StatusCodes } from 'http-status-codes'; import passport from 'passport'; +import pinoMiddleware from 'pino-http'; +import { assetsRouter } from './assets.router'; +import { authenticateApiKey, fabricAPIKeyStrategy } from './auth'; +import { healthRouter } from './health.router'; +import { jobsRouter } from './jobs.router'; +import { logger } from './logger'; +import { transactionsRouter } from './transactions.router'; + +const { BAD_REQUEST, INTERNAL_SERVER_ERROR, NOT_FOUND } = StatusCodes; export const createServer = async (): Promise => { const app = express(); @@ -71,42 +60,9 @@ export const createServer = async (): Promise => { app.use(helmet()); } - const wallet = await createWallet(); - - const gatewayOrg1 = await createGateway( - config.connectionProfileOrg1, - config.mspIdOrg1, - wallet - ); - const networkOrg1 = await getNetwork(gatewayOrg1); - const contractsOrg1 = await getContracts(networkOrg1); - app.set(config.mspIdOrg1, contractsOrg1); - - const gatewayOrg2 = await createGateway( - config.connectionProfileOrg2, - config.mspIdOrg2, - wallet - ); - const networkOrg2 = await getNetwork(gatewayOrg2); - const contractsOrg2 = await getContracts(networkOrg2); - app.set(config.mspIdOrg2, contractsOrg2); - - const assetContracts = new Map(); - assetContracts.set(config.mspIdOrg1, contractsOrg1.assetContract); - assetContracts.set(config.mspIdOrg2, contractsOrg2.assetContract); - startRetryLoop(assetContracts, redis); - - app.set('redis', redis); - - logger.debug('Adding block listener to %s network', config.blockListenerOrg); - if (config.blockListenerOrg === config.ORG1) { - await networkOrg1.addBlockListener(blockEventHandler(redis)); - } else { - await networkOrg2.addBlockListener(blockEventHandler(redis)); - } - app.use('/', healthRouter); app.use('/api/assets', authenticateApiKey, assetsRouter); + app.use('/api/jobs', authenticateApiKey, jobsRouter); app.use('/api/transactions', authenticateApiKey, transactionsRouter); // For everything else diff --git a/asset-transfer-basic/rest-api-typescript/src/transactions.router.ts b/asset-transfer-basic/rest-api-typescript/src/transactions.router.ts index a91c2fc5..4b729951 100644 --- a/asset-transfer-basic/rest-api-typescript/src/transactions.router.ts +++ b/asset-transfer-basic/rest-api-typescript/src/transactions.router.ts @@ -4,81 +4,44 @@ import express, { Request, Response } from 'express'; import { Contract } from 'fabric-network'; -import { protos } from 'fabric-protos'; import { getReasonPhrase, StatusCodes } from 'http-status-codes'; -import { Redis } from 'ioredis'; -import { getTransactionDetails } from './redis'; -import { evatuateTransaction } from './fabric'; +import { getTransactionValidationCode } from './fabric'; import { logger } from './logger'; -import * as config from './config'; import { TransactionNotFoundError } from './errors'; const { INTERNAL_SERVER_ERROR, NOT_FOUND, OK } = StatusCodes; export const transactionsRouter = express.Router(); -type Progress = 'ACCEPTED' | 'RETRYING' | 'DONE'; - transactionsRouter.get( '/:transactionId', async (req: Request, res: Response) => { + const mspId = req.user as string; const transactionId = req.params.transactionId; logger.debug('Read request received for transaction ID %s', transactionId); - let foundTransaction = false; - let progress: Progress = 'DONE'; - let validationCode = ''; - - const mspId = req.user as string; - const qscc = req.app.get(mspId).qsccContract as Contract; - const redis = req.app.get('redis') as Redis; - try { - const savedTransaction = await getTransactionDetails( - redis, + const qsccContract = req.app.get(mspId).qsccContract as Contract; + + const validationCode = await getTransactionValidationCode( + qsccContract, transactionId ); - if (savedTransaction?.transactionState) { - foundTransaction = true; - if (savedTransaction.retries > 0) { - progress = 'RETRYING'; - } else { - progress = 'ACCEPTED'; - } - } - } catch (err) { - logger.error( - err, - 'Redis error processing read request for transaction ID %s', - transactionId - ); - - return res.status(INTERNAL_SERVER_ERROR).json({ - status: getReasonPhrase(INTERNAL_SERVER_ERROR), - timestamp: new Date().toISOString(), + return res.status(OK).json({ + transactionId, + validationCode, }); - } - - try { - const data = await evatuateTransaction( - qscc, - 'GetTransactionByID', - config.channelName, - transactionId - ); - - foundTransaction = true; - // TODO is it possible to use the BlockDecoder decodeTransaction - // function in fabric-common? - const processedTransaction = protos.ProcessedTransaction.decode(data); - validationCode = - protos.TxValidationCode[processedTransaction.validationCode]; } catch (err) { - if (!(err instanceof TransactionNotFoundError)) { + if (err instanceof TransactionNotFoundError) { + return res.status(NOT_FOUND).json({ + status: getReasonPhrase(NOT_FOUND), + timestamp: new Date().toISOString(), + }); + } else { logger.error( - err, - 'Fabric error processing read request for transaction ID %s', + { err }, + 'Error processing read request for transaction ID %s', transactionId ); @@ -88,19 +51,5 @@ transactionsRouter.get( }); } } - - if (foundTransaction) { - return res.status(OK).json({ - status: getReasonPhrase(OK), - progress: progress, - validationCode: validationCode, - timestamp: new Date().toISOString(), - }); - } else { - return res.status(NOT_FOUND).json({ - status: getReasonPhrase(NOT_FOUND), - timestamp: new Date().toISOString(), - }); - } } );