Update retry logic

Previously transactions were only retried after being successfully endorsed, and always with the same transaction ID

Transactions will now be added to a queue for processing and will also be retried if endorsement fails (with a different transaction id for invalid transactions)

Signed-off-by: James Taylor <jamest@uk.ibm.com>
This commit is contained in:
James Taylor 2021-09-22 19:27:38 +01:00
parent 3e49e92703
commit ad3fd7e832
22 changed files with 1826 additions and 1452 deletions

View file

@ -8,6 +8,21 @@ The REST API is intended to work with the [basic asset transfer example](https:/
To install the basic asset transfer chaincode on a local Fabric network, follow the [Using the Fabric test network](https://hyperledger-fabric.readthedocs.io/en/release-2.2/test_network.html) tutorial To install the basic asset transfer chaincode on a local Fabric network, follow the [Using the Fabric test network](https://hyperledger-fabric.readthedocs.io/en/release-2.2/test_network.html) tutorial
## Overview
The sample creates two long lived connections to a Fabric network in order to submit and evaluate transactions using two different identities
To ensure requests respond quickly enough to avoid timeouts, all submit transactions are queued for processing and will be retried if they fail
Submit transactions are retried if they fail with any error, except for errors from the smart contract, or duplicate transaction errors
Alternatively you might prefer to modify the sample to only retry transactions which fail with specific errors instead, for example:
- MVCC_READ_CONFLICT
- PHANTOM_READ_CONFLICT
- ENDORSEMENT_POLICY_FAILURE
- CHAINCODE_VERSION_CONFLICT
- EXPIRED_CHAINCODE
## Usage ## Usage
**Note:** these instructions should work with the release-2.2 branch of `fabric-samples` but later versions require some changes **Note:** these instructions should work with the release-2.2 branch of `fabric-samples` but later versions require some changes
@ -70,7 +85,7 @@ docker-compose up -d
## REST API ## REST API
If everything went well, you can now make basic asset transfer REST calls! If everything went well, you can now open a new terminal and try out some basic asset transfer REST calls!
The examples below require a `SAMPLE_APIKEY` environment variable which must be set to an API key from the `.env` file created above. The examples below require a `SAMPLE_APIKEY` environment variable which must be set to an API key from the `.env` file created above.
@ -86,6 +101,12 @@ SAMPLE_APIKEY=$(grep ORG1_APIKEY .env | cut -d '=' -f 2-)
curl --header "X-Api-Key: ${SAMPLE_APIKEY}" http://localhost:3000/api/assets curl --header "X-Api-Key: ${SAMPLE_APIKEY}" http://localhost:3000/api/assets
``` ```
You should see all the available assets, for example
```
[{"AppraisedValue":300,"Color":"blue","ID":"asset1","Owner":"Tomoko","Size":5},{"AppraisedValue":400,"Color":"red","ID":"asset2","Owner":"Brad","Size":5},{"AppraisedValue":500,"Color":"green","ID":"asset3","Owner":"Jin Soo","Size":10},{"AppraisedValue":600,"Color":"yellow","ID":"asset4","Owner":"Max","Size":10},{"AppraisedValue":700,"Color":"black","ID":"asset5","Owner":"Adriana","Size":15},{"AppraisedValue":800,"Color":"white","ID":"asset6","Owner":"Michel","Size":15}]
```
### Check whether an asset exists... ### Check whether an asset exists...
```shell ```shell
@ -98,18 +119,52 @@ curl --include --header "X-Api-Key: ${SAMPLE_APIKEY}" --request OPTIONS http://l
curl --include --header "Content-Type: application/json" --header "X-Api-Key: ${SAMPLE_APIKEY}" --request POST --data '{"id":"asset7","color":"red","size":42,"owner":"Jean","appraisedValue":101}' http://localhost:3000/api/assets curl --include --header "Content-Type: application/json" --header "X-Api-Key: ${SAMPLE_APIKEY}" --request POST --data '{"id":"asset7","color":"red","size":42,"owner":"Jean","appraisedValue":101}' http://localhost:3000/api/assets
``` ```
The response should include a `jobId` which you can use to check the job status in next step
```
{"status":"Accepted","jobId":"1","timestamp":"2021-10-22T16:27:09.426Z"}
```
### Read job status...
```shell
curl --header "X-Api-Key: ${SAMPLE_APIKEY}" http://localhost:3000/api/jobs/__job_id__
```
The response should include a list of `transactionIds` which you can use to check the transaction status in next step, for example
```
{"jobId":"1","transactionIds":["1dd35c2e5d840fec1dccc6e8cfce886c660c103de3e7b93dd774d04f39eef82a"],"transactionPayload":""}
```
There may be more transaction IDs if the job was retried
### Read transaction status... ### Read transaction status...
```shell ```shell
curl --header "X-Api-Key: ${SAMPLE_APIKEY}" http://localhost:3000/api/transactions/__transaction_id__ curl --header "X-Api-Key: ${SAMPLE_APIKEY}" http://localhost:3000/api/transactions/__transaction_id__
``` ```
The response will show the validation code of the transaction, for example
```
{"transactionId":"1dd35c2e5d840fec1dccc6e8cfce886c660c103de3e7b93dd774d04f39eef82a","validationCode":"VALID"}
```
Alternatively, you will get a 404 not found response if the transaction was not committed
### Read an asset... ### Read an asset...
```shell ```shell
curl --header "X-Api-Key: ${SAMPLE_APIKEY}" http://localhost:3000/api/assets/asset7 curl --header "X-Api-Key: ${SAMPLE_APIKEY}" http://localhost:3000/api/assets/asset7
``` ```
You should see the newly created asset, for example
```
{"AppraisedValue":101,"Color":"red","ID":"asset7","Owner":"Jean","Size":42}
```
### Update an asset... ### Update an asset...
```shell ```shell

View file

@ -40,6 +40,11 @@ X-Api-Key: {{api-key}}
"appraisedValue": 101 "appraisedValue": 101
} }
### Read job status
GET {{apiUrl}}/jobs/__job_id__ HTTP/1.1
X-Api-Key: {{api-key}}
### Read transaction status ### Read transaction status
GET {{apiUrl}}/transactions/__transaction_id__ HTTP/1.1 GET {{apiUrl}}/transactions/__transaction_id__ HTTP/1.1

View file

@ -1955,6 +1955,67 @@
"resolved": "https://registry.npmjs.org/buffer-from/-/buffer-from-1.1.1.tgz", "resolved": "https://registry.npmjs.org/buffer-from/-/buffer-from-1.1.1.tgz",
"integrity": "sha512-MQcXEUbCKtEo7bhqEs6560Hyd4XaovZlO/k9V3hjVUF/zwW7KBVdSK4gIt/bzwS9MbR5qob+F5jusZsb0YQK2A==" "integrity": "sha512-MQcXEUbCKtEo7bhqEs6560Hyd4XaovZlO/k9V3hjVUF/zwW7KBVdSK4gIt/bzwS9MbR5qob+F5jusZsb0YQK2A=="
}, },
"bullmq": {
"version": "1.47.2",
"resolved": "https://registry.npmjs.org/bullmq/-/bullmq-1.47.2.tgz",
"integrity": "sha512-IMzWjXdw6B5RSqPyEiOvoA0efjfTFx2DuB1N+z3T2wYcOVLIcIFybbFjhqVn9Sv/Zb5l6TpuFiU52P+C+/DpNA==",
"requires": {
"@types/ioredis": "^4.27.0",
"cron-parser": "^2.7.3",
"get-port": "^5.0.0",
"ioredis": "^4.27.8",
"lodash": "^4.17.21",
"semver": "^6.3.0",
"tslib": "^1.10.0",
"uuid": "^8.3.2"
},
"dependencies": {
"@types/ioredis": {
"version": "4.27.4",
"resolved": "https://registry.npmjs.org/@types/ioredis/-/ioredis-4.27.4.tgz",
"integrity": "sha512-uTAA/woL//GxXQI1e9FuUoDZCpP8yn5LXQdea1IEFyLtb8GP2w3HfOE+SqglF6QSAp/3cZLWzrMhHqWSYI3bfg==",
"requires": {
"@types/node": "*"
}
},
"debug": {
"version": "4.3.2",
"resolved": "https://registry.npmjs.org/debug/-/debug-4.3.2.tgz",
"integrity": "sha512-mOp8wKcvj7XxC78zLgw/ZA+6TSgkoE2C/ienthhRD298T7UNwAg9diBpLRxC0mOezLl4B0xV7M0cCO6P/O0Xhw==",
"requires": {
"ms": "2.1.2"
}
},
"ioredis": {
"version": "4.27.9",
"resolved": "https://registry.npmjs.org/ioredis/-/ioredis-4.27.9.tgz",
"integrity": "sha512-hAwrx9F+OQ0uIvaJefuS3UTqW+ByOLyLIV+j0EH8ClNVxvFyH9Vmb08hCL4yje6mDYT5zMquShhypkd50RRzkg==",
"requires": {
"cluster-key-slot": "^1.1.0",
"debug": "^4.3.1",
"denque": "^1.1.0",
"lodash.defaults": "^4.2.0",
"lodash.flatten": "^4.4.0",
"lodash.isarguments": "^3.1.0",
"p-map": "^2.1.0",
"redis-commands": "1.7.0",
"redis-errors": "^1.2.0",
"redis-parser": "^3.0.0",
"standard-as-callback": "^2.1.0"
}
},
"ms": {
"version": "2.1.2",
"resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz",
"integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w=="
},
"semver": {
"version": "6.3.0",
"resolved": "https://registry.npmjs.org/semver/-/semver-6.3.0.tgz",
"integrity": "sha512-b39TBaTSfV6yBrapU89p5fKekE2m/NwnDocOVruQFS1/veMgdzuPcnOM34M6CwxW8jH/lxEa5rBoDeUwu5HHTw=="
}
}
},
"bytes": { "bytes": {
"version": "3.1.0", "version": "3.1.0",
"resolved": "https://registry.npmjs.org/bytes/-/bytes-3.1.0.tgz", "resolved": "https://registry.npmjs.org/bytes/-/bytes-3.1.0.tgz",
@ -2179,6 +2240,15 @@
"integrity": "sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ==", "integrity": "sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ==",
"dev": true "dev": true
}, },
"cron-parser": {
"version": "2.18.0",
"resolved": "https://registry.npmjs.org/cron-parser/-/cron-parser-2.18.0.tgz",
"integrity": "sha512-s4odpheTyydAbTBQepsqd2rNWGa2iV3cyo8g7zbI2QQYGLVsfbhmwukayS1XHppe02Oy1fg7mg6xoaraVJeEcg==",
"requires": {
"is-nan": "^1.3.0",
"moment-timezone": "^0.5.31"
}
},
"cross-spawn": { "cross-spawn": {
"version": "7.0.3", "version": "7.0.3",
"resolved": "https://registry.npmjs.org/cross-spawn/-/cross-spawn-7.0.3.tgz", "resolved": "https://registry.npmjs.org/cross-spawn/-/cross-spawn-7.0.3.tgz",
@ -2267,6 +2337,14 @@
"integrity": "sha512-FJ3UgI4gIl+PHZm53knsuSFpE+nESMr7M4v9QcgB7S63Kj/6WqMiFQJpBBYz1Pt+66bZpP3Q7Lye0Oo9MPKEdg==", "integrity": "sha512-FJ3UgI4gIl+PHZm53knsuSFpE+nESMr7M4v9QcgB7S63Kj/6WqMiFQJpBBYz1Pt+66bZpP3Q7Lye0Oo9MPKEdg==",
"dev": true "dev": true
}, },
"define-properties": {
"version": "1.1.3",
"resolved": "https://registry.npmjs.org/define-properties/-/define-properties-1.1.3.tgz",
"integrity": "sha512-3MqfYKj2lLzdMSf8ZIZE/V+Zuy+BgD6f164e8K2w7dgnpKArBDerGYpM46IYYcjnkdPNMjPk9A6VFB8+3SKlXQ==",
"requires": {
"object-keys": "^1.0.12"
}
},
"delayed-stream": { "delayed-stream": {
"version": "1.0.0", "version": "1.0.0",
"resolved": "https://registry.npmjs.org/delayed-stream/-/delayed-stream-1.0.0.tgz", "resolved": "https://registry.npmjs.org/delayed-stream/-/delayed-stream-1.0.0.tgz",
@ -3105,6 +3183,11 @@
"integrity": "sha512-pjzuKtY64GYfWizNAJ0fr9VqttZkNiK2iS430LtIHzjBEr6bX8Am2zm4sW4Ro5wjWW5cAlRL1qAMTcXbjNAO2Q==", "integrity": "sha512-pjzuKtY64GYfWizNAJ0fr9VqttZkNiK2iS430LtIHzjBEr6bX8Am2zm4sW4Ro5wjWW5cAlRL1qAMTcXbjNAO2Q==",
"dev": true "dev": true
}, },
"get-port": {
"version": "5.1.1",
"resolved": "https://registry.npmjs.org/get-port/-/get-port-5.1.1.tgz",
"integrity": "sha512-g/Q1aTSDOxFpchXC4i8ZWvxA1lnPqx/JHqcpIw0/LX9T8x/GBbi6YnlN5nhaKIFkT8oFsscUKgDJYxfwfS6QsQ=="
},
"get-stream": { "get-stream": {
"version": "6.0.1", "version": "6.0.1",
"resolved": "https://registry.npmjs.org/get-stream/-/get-stream-6.0.1.tgz", "resolved": "https://registry.npmjs.org/get-stream/-/get-stream-6.0.1.tgz",
@ -3360,15 +3443,16 @@
"integrity": "sha512-7PnF4oN3CvZF23ADhA5wRaYEQpJ8qygSkbtTXWBeXWXmEVRXK+1ITciHWwHhsjv1TmW0MgacIv6hEi5pX5NQdA==" "integrity": "sha512-7PnF4oN3CvZF23ADhA5wRaYEQpJ8qygSkbtTXWBeXWXmEVRXK+1ITciHWwHhsjv1TmW0MgacIv6hEi5pX5NQdA=="
}, },
"ioredis": { "ioredis": {
"version": "4.27.6", "version": "4.27.9",
"resolved": "https://registry.npmjs.org/ioredis/-/ioredis-4.27.6.tgz", "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-4.27.9.tgz",
"integrity": "sha512-6W3ZHMbpCa8ByMyC1LJGOi7P2WiOKP9B3resoZOVLDhi+6dDBOW+KNsRq3yI36Hmnb2sifCxHX+YSarTeXh48A==", "integrity": "sha512-hAwrx9F+OQ0uIvaJefuS3UTqW+ByOLyLIV+j0EH8ClNVxvFyH9Vmb08hCL4yje6mDYT5zMquShhypkd50RRzkg==",
"requires": { "requires": {
"cluster-key-slot": "^1.1.0", "cluster-key-slot": "^1.1.0",
"debug": "^4.3.1", "debug": "^4.3.1",
"denque": "^1.1.0", "denque": "^1.1.0",
"lodash.defaults": "^4.2.0", "lodash.defaults": "^4.2.0",
"lodash.flatten": "^4.4.0", "lodash.flatten": "^4.4.0",
"lodash.isarguments": "^3.1.0",
"p-map": "^2.1.0", "p-map": "^2.1.0",
"redis-commands": "1.7.0", "redis-commands": "1.7.0",
"redis-errors": "^1.2.0", "redis-errors": "^1.2.0",
@ -3452,6 +3536,15 @@
"is-extglob": "^2.1.1" "is-extglob": "^2.1.1"
} }
}, },
"is-nan": {
"version": "1.3.2",
"resolved": "https://registry.npmjs.org/is-nan/-/is-nan-1.3.2.tgz",
"integrity": "sha512-E+zBKpQ2t6MEo1VsonYmluk9NxGrbzpeeLC2xIViuO2EjU2xsXsBPwTr3Ykv9l08UYEVEdWeRZNouaZqF6RN0w==",
"requires": {
"call-bind": "^1.0.0",
"define-properties": "^1.1.3"
}
},
"is-number": { "is-number": {
"version": "7.0.0", "version": "7.0.0",
"resolved": "https://registry.npmjs.org/is-number/-/is-number-7.0.0.tgz", "resolved": "https://registry.npmjs.org/is-number/-/is-number-7.0.0.tgz",
@ -4997,6 +5090,11 @@
"resolved": "https://registry.npmjs.org/lodash.flatten/-/lodash.flatten-4.4.0.tgz", "resolved": "https://registry.npmjs.org/lodash.flatten/-/lodash.flatten-4.4.0.tgz",
"integrity": "sha1-8xwiIlqWMtK7+OSt2+8kCqdlph8=" "integrity": "sha1-8xwiIlqWMtK7+OSt2+8kCqdlph8="
}, },
"lodash.isarguments": {
"version": "3.1.0",
"resolved": "https://registry.npmjs.org/lodash.isarguments/-/lodash.isarguments-3.1.0.tgz",
"integrity": "sha1-L1c9hcaiQon/AGY7SRwdM4/zRYo="
},
"lodash.merge": { "lodash.merge": {
"version": "4.6.2", "version": "4.6.2",
"resolved": "https://registry.npmjs.org/lodash.merge/-/lodash.merge-4.6.2.tgz", "resolved": "https://registry.npmjs.org/lodash.merge/-/lodash.merge-4.6.2.tgz",
@ -5147,6 +5245,19 @@
"integrity": "sha512-vVqVZQyf3WLx2Shd0qJ9xuvqgAyKPLAiqITEtqW0oIUjzo3PePDd6fW9iFz30ef7Ysp/oiWqbhszeGWW2T6Gzw==", "integrity": "sha512-vVqVZQyf3WLx2Shd0qJ9xuvqgAyKPLAiqITEtqW0oIUjzo3PePDd6fW9iFz30ef7Ysp/oiWqbhszeGWW2T6Gzw==",
"dev": true "dev": true
}, },
"moment": {
"version": "2.29.1",
"resolved": "https://registry.npmjs.org/moment/-/moment-2.29.1.tgz",
"integrity": "sha512-kHmoybcPV8Sqy59DwNDY3Jefr64lK/by/da0ViFcuA4DH0vQg5Q6Ze5VimxkfQNSC+Mls/Kx53s7TjP1RhFEDQ=="
},
"moment-timezone": {
"version": "0.5.33",
"resolved": "https://registry.npmjs.org/moment-timezone/-/moment-timezone-0.5.33.tgz",
"integrity": "sha512-PTc2vcT8K9J5/9rDEPe5czSIKgLoGsH8UNpA4qZTVw0Vd/Uz19geE9abbIOQKaAQFcnQ3v5YEXrbSc5BpshH+w==",
"requires": {
"moment": ">= 2.9.0"
}
},
"mri": { "mri": {
"version": "1.1.4", "version": "1.1.4",
"resolved": "https://registry.npmjs.org/mri/-/mri-1.1.4.tgz", "resolved": "https://registry.npmjs.org/mri/-/mri-1.1.4.tgz",
@ -5252,6 +5363,11 @@
"resolved": "https://registry.npmjs.org/object-inspect/-/object-inspect-1.10.3.tgz", "resolved": "https://registry.npmjs.org/object-inspect/-/object-inspect-1.10.3.tgz",
"integrity": "sha512-e5mCJlSH7poANfC8z8S9s9S2IN5/4Zb3aZ33f5s8YqoazCFzNLloLU8r5VCG+G7WoqLvAAZoVMcy3tp/3X0Plw==" "integrity": "sha512-e5mCJlSH7poANfC8z8S9s9S2IN5/4Zb3aZ33f5s8YqoazCFzNLloLU8r5VCG+G7WoqLvAAZoVMcy3tp/3X0Plw=="
}, },
"object-keys": {
"version": "1.1.1",
"resolved": "https://registry.npmjs.org/object-keys/-/object-keys-1.1.1.tgz",
"integrity": "sha512-NuAESUOUMrlIXOfHKzD6bpPu3tYt3xvjNdRIQ+FeT0lNb4K8WR70CaDxhuNguS2XG+GjkyMwOzsN5ZktImfhLA=="
},
"on-finished": { "on-finished": {
"version": "2.3.0", "version": "2.3.0",
"resolved": "https://registry.npmjs.org/on-finished/-/on-finished-2.3.0.tgz", "resolved": "https://registry.npmjs.org/on-finished/-/on-finished-2.3.0.tgz",
@ -6378,8 +6494,7 @@
"tslib": { "tslib": {
"version": "1.14.1", "version": "1.14.1",
"resolved": "https://registry.npmjs.org/tslib/-/tslib-1.14.1.tgz", "resolved": "https://registry.npmjs.org/tslib/-/tslib-1.14.1.tgz",
"integrity": "sha512-Xni35NKzjgMrwevysHTCArtLDpPvye8zV/0E4EyYn43P7/7qvQwPh9BGkHewbMulVntbigmcT7rdX3BNo9wRJg==", "integrity": "sha512-Xni35NKzjgMrwevysHTCArtLDpPvye8zV/0E4EyYn43P7/7qvQwPh9BGkHewbMulVntbigmcT7rdX3BNo9wRJg=="
"dev": true
}, },
"tsutils": { "tsutils": {
"version": "3.21.0", "version": "3.21.0",
@ -6473,6 +6588,11 @@
"resolved": "https://registry.npmjs.org/utils-merge/-/utils-merge-1.0.1.tgz", "resolved": "https://registry.npmjs.org/utils-merge/-/utils-merge-1.0.1.tgz",
"integrity": "sha1-n5VxD1CiZ5R7LMwSR0HBAoQn5xM=" "integrity": "sha1-n5VxD1CiZ5R7LMwSR0HBAoQn5xM="
}, },
"uuid": {
"version": "8.3.2",
"resolved": "https://registry.npmjs.org/uuid/-/uuid-8.3.2.tgz",
"integrity": "sha512-+NYs2QeMWy+GWFOEm9xnn6HCDp0l7QBD7ml8zLUmJ+93Q5NF0NocErnwkTkXVFNiX3/fpC6afS8Dhb/gz7R7eg=="
},
"v8-compile-cache": { "v8-compile-cache": {
"version": "2.3.0", "version": "2.3.0",
"resolved": "https://registry.npmjs.org/v8-compile-cache/-/v8-compile-cache-2.3.0.tgz", "resolved": "https://registry.npmjs.org/v8-compile-cache/-/v8-compile-cache-2.3.0.tgz",

View file

@ -4,6 +4,7 @@
"description": "Asset Transfer Basic REST API implemented in TypeScript", "description": "Asset Transfer Basic REST API implemented in TypeScript",
"main": "dist/index.js", "main": "dist/index.js",
"dependencies": { "dependencies": {
"bullmq": "^1.47.2",
"dotenv": "^10.0.0", "dotenv": "^10.0.0",
"env-var": "^7.0.1", "env-var": "^7.0.1",
"express": "^4.17.1", "express": "^4.17.1",
@ -11,7 +12,7 @@
"fabric-network": "^2.2.8", "fabric-network": "^2.2.8",
"helmet": "^4.6.0", "helmet": "^4.6.0",
"http-status-codes": "^2.1.4", "http-status-codes": "^2.1.4",
"ioredis": "^4.27.6", "ioredis": "^4.27.8",
"passport": "^0.4.1", "passport": "^0.4.1",
"passport-headerapikey": "^1.2.2", "passport-headerapikey": "^1.2.2",
"pino": "^6.11.3", "pino": "^6.11.3",
@ -53,7 +54,7 @@
"start": "node --require source-map-support/register ./dist", "start": "node --require source-map-support/register ./dist",
"start:dotenv": "node --require source-map-support/register --require dotenv/config ./dist", "start:dotenv": "node --require source-map-support/register --require dotenv/config ./dist",
"start:dev": "node --require source-map-support/register --require dotenv/config ./dist | pino-pretty", "start:dev": "node --require source-map-support/register --require dotenv/config ./dist | pino-pretty",
"start:redis": "docker run -p 6379:6379 --name fabric-sample-redis -d redis", "start:redis": "docker run -p 6379:6379 --name fabric-sample-redis -d redis --maxmemory-policy noeviction",
"test": "jest" "test": "jest"
}, },
"author": "Hyperledger", "author": "Hyperledger",

View file

@ -1,178 +0,0 @@
/*
* SPDX-License-Identifier: Apache-2.0
*/
import { mock } from 'jest-mock-extended';
import { Contract, Network, Transaction } from 'fabric-network';
import { mocked } from 'ts-jest/utils';
import * as fabricProtos from 'fabric-protos';
const actualFabricNetwork = jest.requireActual('fabric-network');
const Wallet = actualFabricNetwork.Wallet;
const Wallets = actualFabricNetwork.Wallets;
const mockAsset1 = {
ID: 'asset1',
Color: 'blue',
Size: 5,
Owner: 'Tomoko',
AppraisedValue: 300,
};
const mockAsset1Buffer = Buffer.from(JSON.stringify(mockAsset1));
const mockAsset2 = {
ID: 'asset2',
Color: 'red',
Size: 5,
Owner: 'Brad',
AppraisedValue: 400,
};
const mockAllAssetsBuffer = Buffer.from(
JSON.stringify([mockAsset1, mockAsset2])
);
const mockBlockchainInfoProto = fabricProtos.common.BlockchainInfo.create();
mockBlockchainInfoProto.height = 42;
const mockBlockchainInfoBuffer = Buffer.from(
fabricProtos.common.BlockchainInfo.encode(mockBlockchainInfoProto).finish()
);
const processedTransactionProto =
fabricProtos.protos.ProcessedTransaction.create();
processedTransactionProto.validationCode =
fabricProtos.protos.TxValidationCode.VALID;
const processedTransactionBuffer = Buffer.from(
fabricProtos.protos.ProcessedTransaction.encode(
processedTransactionProto
).finish()
);
type FabricNetworkModule = jest.Mocked<typeof import('fabric-network')>;
const {
DefaultEventHandlerStrategies,
DefaultQueryHandlerStrategies,
Gateway,
}: FabricNetworkModule = jest.createMockFromModule('fabric-network');
const mockAssetExistsTransaction = mock<Transaction>();
mockAssetExistsTransaction.evaluate
.calledWith('asset1')
.mockResolvedValue(Buffer.from('true'));
mockAssetExistsTransaction.evaluate
.calledWith('asset3')
.mockResolvedValue(Buffer.from('false'));
const mockReadAssetTransaction = mock<Transaction>();
mockReadAssetTransaction.evaluate
.calledWith('asset1')
.mockResolvedValue(mockAsset1Buffer);
mockReadAssetTransaction.evaluate
.calledWith('asset3')
.mockRejectedValue(new Error('the asset asset3 does not exist'));
const mockCreateAssetTransaction = mock<Transaction>();
mockCreateAssetTransaction.getTransactionId.mockReturnValue('txn1');
mockCreateAssetTransaction.submit
.calledWith('asset1')
.mockRejectedValue(
new Error(
'No valid responses from any peers. Errors:\n peer=peer0.org1.example.com:7051, status=500, message=the asset asset1 already exists\n peer=peer0.org2.example.com:9051, status=500, message=the asset asset3 already exists'
)
);
// NOTE: only the second mocked GetAllAssets with return no assets
// TODO find a better alternative so that test order does not matter
const mockGetAllAssetsTransaction = mock<Transaction>();
mockGetAllAssetsTransaction.evaluate
.mockResolvedValueOnce(Buffer.from(''))
.mockResolvedValueOnce(mockAllAssetsBuffer);
const mockUpdateAssetTransaction = mock<Transaction>();
mockUpdateAssetTransaction.getTransactionId.mockReturnValue('txn1');
mockUpdateAssetTransaction.submit
.calledWith('asset3')
.mockRejectedValue(
new Error(
'No valid responses from any peers. Errors:\n peer=peer0.org1.example.com:7051, status=500, message=the asset asset3 does not exist\n peer=peer0.org2.example.com:9051, status=500, message=the asset asset3 does not exist'
)
);
const mockTransferAssetTransaction = mock<Transaction>();
mockTransferAssetTransaction.getTransactionId.mockReturnValue('txn1');
mockTransferAssetTransaction.submit
.calledWith('asset3')
.mockRejectedValue(
new Error(
'No valid responses from any peers. Errors:\n peer=peer0.org1.example.com:7051, status=500, message=the asset asset3 does not exist\n peer=peer0.org2.example.com:9051, status=500, message=the asset asset3 does not exist'
)
);
const mockDeleteAssetTransaction = mock<Transaction>();
mockDeleteAssetTransaction.getTransactionId.mockReturnValue('txn1');
mockDeleteAssetTransaction.submit
.calledWith('asset3')
.mockRejectedValue(
new Error(
'No valid responses from any peers. Errors:\n peer=peer0.org1.example.com:7051, status=500, message=the asset asset3 does not exist\n peer=peer0.org2.example.com:9051, status=500, message=the asset asset3 does not exist'
)
);
const mockBasicContract = mock<Contract>();
mockBasicContract.createTransaction
.calledWith('AssetExists')
.mockReturnValue(mockAssetExistsTransaction);
mockBasicContract.createTransaction
.calledWith('ReadAsset')
.mockReturnValue(mockReadAssetTransaction);
mockBasicContract.createTransaction
.calledWith('CreateAsset')
.mockReturnValue(mockCreateAssetTransaction);
mockBasicContract.createTransaction
.calledWith('GetAllAssets')
.mockReturnValue(mockGetAllAssetsTransaction);
mockBasicContract.createTransaction
.calledWith('UpdateAsset')
.mockReturnValue(mockUpdateAssetTransaction);
mockBasicContract.createTransaction
.calledWith('TransferAsset')
.mockReturnValue(mockTransferAssetTransaction);
mockBasicContract.createTransaction
.calledWith('DeleteAsset')
.mockReturnValue(mockDeleteAssetTransaction);
const mockGetTransactionByIDTransaction = mock<Transaction>();
mockGetTransactionByIDTransaction.evaluate
.calledWith('mychannel', 'txn2')
.mockResolvedValue(processedTransactionBuffer);
mockGetTransactionByIDTransaction.evaluate
.calledWith('mychannel', 'txn3')
.mockRejectedValue(
new Error(
'Failed to get transaction with id txn3, error Entry not found in index'
)
);
const mockSystemContract = mock<Contract>();
mockSystemContract.evaluateTransaction
.calledWith('GetChainInfo')
.mockResolvedValue(mockBlockchainInfoBuffer);
mockSystemContract.createTransaction
.calledWith('GetTransactionByID')
.mockReturnValue(mockGetTransactionByIDTransaction);
const mockNetwork = mock<Network>();
mockNetwork.getContract.calledWith('basic').mockReturnValue(mockBasicContract);
mockNetwork.getContract.calledWith('qscc').mockReturnValue(mockSystemContract);
mocked(Gateway.prototype.getNetwork).mockResolvedValue(mockNetwork);
export {
DefaultEventHandlerStrategies,
DefaultQueryHandlerStrategies,
Contract,
Gateway,
Wallet,
Wallets,
};

View file

@ -2,20 +2,53 @@
* SPDX-License-Identifier: Apache-2.0 * SPDX-License-Identifier: Apache-2.0
*/ */
jest.mock('fabric-network'); import { Job, Queue } from 'bullmq';
jest.mock('ioredis', () => require('ioredis-mock/jest'));
import { createServer } from '../server';
import { Application } from 'express'; import { Application } from 'express';
import { Contract, Transaction } from 'fabric-network';
import * as fabricProtos from 'fabric-protos';
import { mock, MockProxy } from 'jest-mock-extended';
import { mocked } from 'ts-jest/utils';
import request from 'supertest'; import request from 'supertest';
import * as config from '../config';
import { createServer } from '../server';
jest.mock('../config');
jest.mock('bullmq');
const mockAsset1 = {
ID: 'asset1',
Color: 'blue',
Size: 5,
Owner: 'Tomoko',
AppraisedValue: 300,
};
const mockAsset1Buffer = Buffer.from(JSON.stringify(mockAsset1));
const mockAsset2 = {
ID: 'asset2',
Color: 'red',
Size: 5,
Owner: 'Brad',
AppraisedValue: 400,
};
const mockAllAssetsBuffer = Buffer.from(
JSON.stringify([mockAsset1, mockAsset2])
);
// TODO add tests for server errors // TODO add tests for server errors
// TODO implement 405 Method Not Allowed where appropriate and add tests
describe('Asset Transfer Besic REST API', () => { describe('Asset Transfer Besic REST API', () => {
let app: Application; let app: Application;
let mockJobQueue: MockProxy<Queue>;
beforeEach(async () => { beforeEach(async () => {
app = await createServer(); app = await createServer();
const mockJob = mock<Job>();
mockJob.id = '1';
mockJobQueue = mock<Queue>();
mockJobQueue.add.mockResolvedValue(mockJob);
app.set('jobq', mockJobQueue);
}); });
describe('/ready', () => { describe('/ready', () => {
@ -35,6 +68,31 @@ describe('Asset Transfer Besic REST API', () => {
describe('/live', () => { describe('/live', () => {
it('GET should respond with 200 OK json', async () => { it('GET should respond with 200 OK json', async () => {
const mockBlockchainInfoProto =
fabricProtos.common.BlockchainInfo.create();
mockBlockchainInfoProto.height = 42;
const mockBlockchainInfoBuffer = Buffer.from(
fabricProtos.common.BlockchainInfo.encode(
mockBlockchainInfoProto
).finish()
);
const mockOrg1QsccContract = mock<Contract>();
mockOrg1QsccContract.evaluateTransaction
.calledWith('GetChainInfo')
.mockResolvedValue(mockBlockchainInfoBuffer);
app.set(config.mspIdOrg1, {
qsccContract: mockOrg1QsccContract,
});
const mockOrg2QsccContract = mock<Contract>();
mockOrg2QsccContract.evaluateTransaction
.calledWith('GetChainInfo')
.mockResolvedValue(mockBlockchainInfoBuffer);
app.set(config.mspIdOrg2, {
qsccContract: mockOrg2QsccContract,
});
const response = await request(app).get('/live'); const response = await request(app).get('/live');
expect(response.statusCode).toEqual(200); expect(response.statusCode).toEqual(200);
expect(response.header).toHaveProperty( expect(response.header).toHaveProperty(
@ -49,6 +107,19 @@ describe('Asset Transfer Besic REST API', () => {
}); });
describe('/api/assets', () => { describe('/api/assets', () => {
let mockGetAllAssetsTransaction: MockProxy<Transaction>;
beforeEach(() => {
mockGetAllAssetsTransaction = mock<Transaction>();
const mockBasicContract = mock<Contract>();
mockBasicContract.createTransaction
.calledWith('GetAllAssets')
.mockReturnValue(mockGetAllAssetsTransaction);
app.set(config.mspIdOrg1, {
assetContract: mockBasicContract,
});
});
it('GET should respond with 401 unauthorized json when an invalid API key is specified', async () => { it('GET should respond with 401 unauthorized json when an invalid API key is specified', async () => {
const response = await request(app) const response = await request(app)
.get('/api/assets') .get('/api/assets')
@ -66,8 +137,8 @@ describe('Asset Transfer Besic REST API', () => {
}); });
it('GET should respond with an empty json array when there are no assets', async () => { it('GET should respond with an empty json array when there are no assets', async () => {
// NOTE: only the first mocked GetAllAssets with return no assets mockGetAllAssetsTransaction.evaluate.mockResolvedValue(Buffer.from(''));
// TODO find a better alternative so that test order does not matter
const response = await request(app) const response = await request(app)
.get('/api/assets') .get('/api/assets')
.set('X-Api-Key', 'ORG1MOCKAPIKEY'); .set('X-Api-Key', 'ORG1MOCKAPIKEY');
@ -80,8 +151,10 @@ describe('Asset Transfer Besic REST API', () => {
}); });
it('GET should respond with json array of assets', async () => { it('GET should respond with json array of assets', async () => {
// NOTE: only the second mocked GetAllAssets with return no assets mockGetAllAssetsTransaction.evaluate.mockResolvedValue(
// TODO find a better alternative so that test order does not matter mockAllAssetsBuffer
);
const response = await request(app) const response = await request(app)
.get('/api/assets') .get('/api/assets')
.set('X-Api-Key', 'ORG1MOCKAPIKEY'); .set('X-Api-Key', 'ORG1MOCKAPIKEY');
@ -180,37 +253,34 @@ describe('Asset Transfer Besic REST API', () => {
); );
expect(response.body).toEqual({ expect(response.body).toEqual({
status: 'Accepted', status: 'Accepted',
transactionId: 'txn1', jobId: '1',
timestamp: expect.any(String),
});
});
it('POST should respond with 409 conflict json when asset already exists', async () => {
const response = await request(app)
.post('/api/assets')
.send({
id: 'asset1',
color: 'blue',
size: 5,
owner: 'Tomoko',
appraisedValue: 300,
})
.set('X-Api-Key', 'ORG1MOCKAPIKEY');
expect(response.statusCode).toEqual(409);
expect(response.header).toHaveProperty(
'content-type',
'application/json; charset=utf-8'
);
expect(response.body).toEqual({
status: 'Conflict',
reason: 'ASSET_EXISTS',
message: 'the asset asset1 already exists',
timestamp: expect.any(String), timestamp: expect.any(String),
}); });
}); });
}); });
describe('/api/assets/:id', () => { describe('/api/assets/:id', () => {
let mockAssetExistsTransaction: MockProxy<Transaction>;
let mockReadAssetTransaction: MockProxy<Transaction>;
beforeEach(() => {
const mockBasicContract = mock<Contract>();
mockAssetExistsTransaction = mock<Transaction>();
mockBasicContract.createTransaction
.calledWith('AssetExists')
.mockReturnValue(mockAssetExistsTransaction);
mockReadAssetTransaction = mock<Transaction>();
mockBasicContract.createTransaction
.calledWith('ReadAsset')
.mockReturnValue(mockReadAssetTransaction);
app.set(config.mspIdOrg1, {
assetContract: mockBasicContract,
});
});
it('OPTIONS should respond with 401 unauthorized json when an invalid API key is specified', async () => { it('OPTIONS should respond with 401 unauthorized json when an invalid API key is specified', async () => {
const response = await request(app) const response = await request(app)
.options('/api/assets/asset1') .options('/api/assets/asset1')
@ -228,6 +298,10 @@ describe('Asset Transfer Besic REST API', () => {
}); });
it('OPTIONS should respond with 404 not found json without the allow header when there is no asset with the specified ID', async () => { it('OPTIONS should respond with 404 not found json without the allow header when there is no asset with the specified ID', async () => {
mockAssetExistsTransaction.evaluate
.calledWith('asset3')
.mockResolvedValue(Buffer.from('false'));
const response = await request(app) const response = await request(app)
.options('/api/assets/asset3') .options('/api/assets/asset3')
.set('X-Api-Key', 'ORG1MOCKAPIKEY'); .set('X-Api-Key', 'ORG1MOCKAPIKEY');
@ -244,6 +318,10 @@ describe('Asset Transfer Besic REST API', () => {
}); });
it('OPTIONS should respond with 200 OK json with the allow header', async () => { it('OPTIONS should respond with 200 OK json with the allow header', async () => {
mockAssetExistsTransaction.evaluate
.calledWith('asset1')
.mockResolvedValue(Buffer.from('true'));
const response = await request(app) const response = await request(app)
.options('/api/assets/asset1') .options('/api/assets/asset1')
.set('X-Api-Key', 'ORG1MOCKAPIKEY'); .set('X-Api-Key', 'ORG1MOCKAPIKEY');
@ -279,6 +357,10 @@ describe('Asset Transfer Besic REST API', () => {
}); });
it('GET should respond with 404 not found json when there is no asset with the specified ID', async () => { it('GET should respond with 404 not found json when there is no asset with the specified ID', async () => {
mockReadAssetTransaction.evaluate
.calledWith('asset3')
.mockRejectedValue(new Error('the asset asset3 does not exist'));
const response = await request(app) const response = await request(app)
.get('/api/assets/asset3') .get('/api/assets/asset3')
.set('X-Api-Key', 'ORG1MOCKAPIKEY'); .set('X-Api-Key', 'ORG1MOCKAPIKEY');
@ -294,6 +376,10 @@ describe('Asset Transfer Besic REST API', () => {
}); });
it('GET should respond with the asset json when the asset exists', async () => { it('GET should respond with the asset json when the asset exists', async () => {
mockReadAssetTransaction.evaluate
.calledWith('asset1')
.mockResolvedValue(mockAsset1Buffer);
const response = await request(app) const response = await request(app)
.get('/api/assets/asset1') .get('/api/assets/asset1')
.set('X-Api-Key', 'ORG1MOCKAPIKEY'); .set('X-Api-Key', 'ORG1MOCKAPIKEY');
@ -334,28 +420,6 @@ describe('Asset Transfer Besic REST API', () => {
}); });
}); });
it('PUT should respond with 404 not found json when there is no asset with the specified ID', async () => {
const response = await request(app)
.put('/api/assets/asset3')
.send({
id: 'asset3',
color: 'red',
size: 5,
owner: 'Brad',
appraisedValue: 400,
})
.set('X-Api-Key', 'ORG1MOCKAPIKEY');
expect(response.statusCode).toEqual(404);
expect(response.header).toHaveProperty(
'content-type',
'application/json; charset=utf-8'
);
expect(response.body).toEqual({
status: 'Not Found',
timestamp: expect.any(String),
});
});
it('PUT should respond with 400 bad request json when IDs do not match', async () => { it('PUT should respond with 400 bad request json when IDs do not match', async () => {
const response = await request(app) const response = await request(app)
.put('/api/assets/asset1') .put('/api/assets/asset1')
@ -429,7 +493,7 @@ describe('Asset Transfer Besic REST API', () => {
); );
expect(response.body).toEqual({ expect(response.body).toEqual({
status: 'Accepted', status: 'Accepted',
transactionId: 'txn1', jobId: '1',
timestamp: expect.any(String), timestamp: expect.any(String),
}); });
}); });
@ -451,22 +515,6 @@ describe('Asset Transfer Besic REST API', () => {
}); });
}); });
it('PATCH should respond with 404 not found json when there is no asset with the specified ID', async () => {
const response = await request(app)
.patch('/api/assets/asset3')
.send([{ op: 'replace', path: '/owner', value: 'Ashleigh' }])
.set('X-Api-Key', 'ORG1MOCKAPIKEY');
expect(response.statusCode).toEqual(404);
expect(response.header).toHaveProperty(
'content-type',
'application/json; charset=utf-8'
);
expect(response.body).toEqual({
status: 'Not Found',
timestamp: expect.any(String),
});
});
it('PATCH should respond with 400 bad request json for invalid patch op/path', async () => { it('PATCH should respond with 400 bad request json for invalid patch op/path', async () => {
const response = await request(app) const response = await request(app)
.patch('/api/assets/asset1') .patch('/api/assets/asset1')
@ -505,7 +553,7 @@ describe('Asset Transfer Besic REST API', () => {
); );
expect(response.body).toEqual({ expect(response.body).toEqual({
status: 'Accepted', status: 'Accepted',
transactionId: 'txn1', jobId: '1',
timestamp: expect.any(String), timestamp: expect.any(String),
}); });
}); });
@ -526,9 +574,45 @@ describe('Asset Transfer Besic REST API', () => {
}); });
}); });
it('DELETE should respond with 404 not found json when there is no asset with the specified ID', async () => { it('DELETE should respond with 202 accepted json', async () => {
const response = await request(app) const response = await request(app)
.delete('/api/assets/asset3') .delete('/api/assets/asset1')
.set('X-Api-Key', 'ORG1MOCKAPIKEY');
expect(response.statusCode).toEqual(202);
expect(response.header).toHaveProperty(
'content-type',
'application/json; charset=utf-8'
);
expect(response.body).toEqual({
status: 'Accepted',
jobId: '1',
timestamp: expect.any(String),
});
});
});
describe('/api/jobs/:id', () => {
it('GET should respond with 401 unauthorized json when an invalid API key is specified', async () => {
const response = await request(app)
.get('/api/jobs/1')
.set('X-Api-Key', 'NOTTHERIGHTAPIKEY');
expect(response.statusCode).toEqual(401);
expect(response.header).toHaveProperty(
'content-type',
'application/json; charset=utf-8'
);
expect(response.body).toEqual({
reason: 'NO_VALID_APIKEY',
status: 'Unauthorized',
timestamp: expect.any(String),
});
});
it('GET should respond with 404 not found json when there is no job with the specified ID', async () => {
mocked(Job.fromId).mockResolvedValue(undefined);
const response = await request(app)
.get('/api/jobs/3')
.set('X-Api-Key', 'ORG1MOCKAPIKEY'); .set('X-Api-Key', 'ORG1MOCKAPIKEY');
expect(response.statusCode).toEqual(404); expect(response.statusCode).toEqual(404);
expect(response.header).toHaveProperty( expect(response.header).toHaveProperty(
@ -541,24 +625,49 @@ describe('Asset Transfer Besic REST API', () => {
}); });
}); });
it('DELETE should respond with 202 accepted json', async () => { it('GET should respond with json details for the specified job ID', async () => {
const mockJob = mock<Job>();
mockJob.id = '2';
mockJob.data = {
transactionIds: ['txn1', 'txn2'],
};
mockJob.returnvalue = {
transactionError: 'Mock error',
transactionPayload: Buffer.from('Mock payload'),
};
mockJobQueue.getJob.calledWith('2').mockResolvedValue(mockJob);
const response = await request(app) const response = await request(app)
.delete('/api/assets/asset1') .get('/api/jobs/2')
.set('X-Api-Key', 'ORG1MOCKAPIKEY'); .set('X-Api-Key', 'ORG1MOCKAPIKEY');
expect(response.statusCode).toEqual(202); expect(response.statusCode).toEqual(200);
expect(response.header).toHaveProperty( expect(response.header).toHaveProperty(
'content-type', 'content-type',
'application/json; charset=utf-8' 'application/json; charset=utf-8'
); );
expect(response.body).toEqual({ expect(response.body).toEqual({
status: 'Accepted', jobId: '2',
transactionId: 'txn1', transactionIds: ['txn1', 'txn2'],
timestamp: expect.any(String), transactionError: 'Mock error',
transactionPayload: 'Mock payload',
}); });
}); });
}); });
describe('/api/transactions/:id', () => { describe('/api/transactions/:id', () => {
let mockGetTransactionByIDTransaction: MockProxy<Transaction>;
beforeEach(() => {
mockGetTransactionByIDTransaction = mock<Transaction>();
const mockQsccContract = mock<Contract>();
mockQsccContract.createTransaction
.calledWith('GetTransactionByID')
.mockReturnValue(mockGetTransactionByIDTransaction);
app.set(config.mspIdOrg1, {
qsccContract: mockQsccContract,
});
});
it('GET should respond with 401 unauthorized json when an invalid API key is specified', async () => { it('GET should respond with 401 unauthorized json when an invalid API key is specified', async () => {
const response = await request(app) const response = await request(app)
.get('/api/transactions/txn1') .get('/api/transactions/txn1')
@ -576,6 +685,14 @@ describe('Asset Transfer Besic REST API', () => {
}); });
it('GET should respond with 404 not found json when there is no transaction with the specified ID', async () => { it('GET should respond with 404 not found json when there is no transaction with the specified ID', async () => {
mockGetTransactionByIDTransaction.evaluate
.calledWith('mychannel', 'txn3')
.mockRejectedValue(
new Error(
'Failed to get transaction with id txn3, error Entry not found in index'
)
);
const response = await request(app) const response = await request(app)
.get('/api/transactions/txn3') .get('/api/transactions/txn3')
.set('X-Api-Key', 'ORG1MOCKAPIKEY'); .set('X-Api-Key', 'ORG1MOCKAPIKEY');
@ -591,6 +708,19 @@ describe('Asset Transfer Besic REST API', () => {
}); });
it('GET should respond with json details for the specified transaction ID', async () => { it('GET should respond with json details for the specified transaction ID', async () => {
const processedTransactionProto =
fabricProtos.protos.ProcessedTransaction.create();
processedTransactionProto.validationCode =
fabricProtos.protos.TxValidationCode.VALID;
const processedTransactionBuffer = Buffer.from(
fabricProtos.protos.ProcessedTransaction.encode(
processedTransactionProto
).finish()
);
mockGetTransactionByIDTransaction.evaluate
.calledWith('mychannel', 'txn2')
.mockResolvedValue(processedTransactionBuffer);
const response = await request(app) const response = await request(app)
.get('/api/transactions/txn2') .get('/api/transactions/txn2')
.set('X-Api-Key', 'ORG1MOCKAPIKEY'); .set('X-Api-Key', 'ORG1MOCKAPIKEY');
@ -600,10 +730,8 @@ describe('Asset Transfer Besic REST API', () => {
'application/json; charset=utf-8' 'application/json; charset=utf-8'
); );
expect(response.body).toEqual({ expect(response.body).toEqual({
status: 'OK', transactionId: 'txn2',
progress: 'DONE',
validationCode: 'VALID', validationCode: 'VALID',
timestamp: expect.any(String),
}); });
}); });
}); });

View file

@ -1,32 +1,35 @@
/* /*
* SPDX-License-Identifier: Apache-2.0 * SPDX-License-Identifier: Apache-2.0
* *
* Note: this sample is intended to work with the basic asset transfer * This sample is intended to work with the basic asset transfer
* chaincode which imposes some constraints on what is possible here. * chaincode which imposes some constraints on what is possible here.
* *
* For example, * For example,
* - There is no validation for Asset IDs * - There is no validation for Asset IDs
* - There are no error codes from the chaincode * - There are no error codes from the chaincode
* *
* To avoid timeouts, long running tasks should be decoupled from HTTP request
* processing
*
* Submit transactions can potentially be very long running, especially if the
* transaction fails and needs to be retried one or more times
*
* To allow requests to respond quickly enough, this sample queues submit
* requests for processing asynchronously and immediately returns 202 Accepted
*/ */
import express, { Request, Response } from 'express'; import express, { Request, Response } from 'express';
import { body, validationResult } from 'express-validator'; import { body, validationResult } from 'express-validator';
import { Contract } from 'fabric-network'; import { Contract } from 'fabric-network';
import { getReasonPhrase, StatusCodes } from 'http-status-codes'; import { getReasonPhrase, StatusCodes } from 'http-status-codes';
import { Redis } from 'ioredis'; import { Queue } from 'bullmq';
import { AssetExistsError, AssetNotFoundError } from './errors'; import { AssetNotFoundError } from './errors';
import { evatuateTransaction, submitTransaction } from './fabric'; import { evatuateTransaction } from './fabric';
import { addSubmitTransactionJob } from './jobs';
import { logger } from './logger'; import { logger } from './logger';
const { const { ACCEPTED, BAD_REQUEST, INTERNAL_SERVER_ERROR, NOT_FOUND, OK } =
ACCEPTED, StatusCodes;
BAD_REQUEST,
CONFLICT,
INTERNAL_SERVER_ERROR,
NOT_FOUND,
OK,
} = StatusCodes;
export const assetsRouter = express.Router(); export const assetsRouter = express.Router();
@ -45,7 +48,7 @@ assetsRouter.get('/', async (req: Request, res: Response) => {
return res.status(OK).json(assets); return res.status(OK).json(assets);
} catch (err) { } catch (err) {
logger.error(err, 'Error processing get all assets request'); logger.error({ err }, 'Error processing get all assets request');
return res.status(INTERNAL_SERVER_ERROR).json({ return res.status(INTERNAL_SERVER_ERROR).json({
status: getReasonPhrase(INTERNAL_SERVER_ERROR), status: getReasonPhrase(INTERNAL_SERVER_ERROR),
timestamp: new Date().toISOString(), timestamp: new Date().toISOString(),
@ -76,14 +79,12 @@ assetsRouter.post(
} }
const mspId = req.user as string; const mspId = req.user as string;
const contract = req.app.get(mspId).assetContract as Contract;
const redis = req.app.get('redis') as Redis;
const assetId = req.body.id; const assetId = req.body.id;
try { try {
const transactionId = await submitTransaction( const submitQueue = req.app.get('jobq') as Queue;
contract, const jobId = await addSubmitTransactionJob(
redis, submitQueue,
mspId, mspId,
'CreateAsset', 'CreateAsset',
assetId, assetId,
@ -95,26 +96,16 @@ assetsRouter.post(
return res.status(ACCEPTED).json({ return res.status(ACCEPTED).json({
status: getReasonPhrase(ACCEPTED), status: getReasonPhrase(ACCEPTED),
transactionId: transactionId, jobId: jobId,
timestamp: new Date().toISOString(), timestamp: new Date().toISOString(),
}); });
} catch (err) { } catch (err) {
logger.error( logger.error(
err, { err },
'Error processing create asset request for asset ID %s with transaction ID %s', 'Error processing create asset request for asset ID %s',
assetId, assetId
err.transactionId
); );
if (err instanceof AssetExistsError) {
return res.status(CONFLICT).json({
status: getReasonPhrase(CONFLICT),
reason: 'ASSET_EXISTS',
message: err.message,
timestamp: new Date().toISOString(),
});
}
return res.status(INTERNAL_SERVER_ERROR).json({ return res.status(INTERNAL_SERVER_ERROR).json({
status: getReasonPhrase(INTERNAL_SERVER_ERROR), status: getReasonPhrase(INTERNAL_SERVER_ERROR),
timestamp: new Date().toISOString(), timestamp: new Date().toISOString(),
@ -152,7 +143,7 @@ assetsRouter.options('/:assetId', async (req: Request, res: Response) => {
} }
} catch (err) { } catch (err) {
logger.error( logger.error(
err, { err },
'Error processing asset options request for asset ID %s', 'Error processing asset options request for asset ID %s',
assetId assetId
); );
@ -177,7 +168,7 @@ assetsRouter.get('/:assetId', async (req: Request, res: Response) => {
return res.status(OK).json(asset); return res.status(OK).json(asset);
} catch (err) { } catch (err) {
logger.error( logger.error(
err, { err },
'Error processing read asset request for asset ID %s', 'Error processing read asset request for asset ID %s',
assetId assetId
); );
@ -228,14 +219,12 @@ assetsRouter.put(
} }
const mspId = req.user as string; const mspId = req.user as string;
const contract = req.app.get(mspId).assetContract as Contract;
const redis = req.app.get('redis') as Redis;
const assetId = req.params.assetId; const assetId = req.params.assetId;
try { try {
const transactionId = await submitTransaction( const submitQueue = req.app.get('jobq') as Queue;
contract, const jobId = await addSubmitTransactionJob(
redis, submitQueue,
mspId, mspId,
'UpdateAsset', 'UpdateAsset',
assetId, assetId,
@ -247,24 +236,16 @@ assetsRouter.put(
return res.status(ACCEPTED).json({ return res.status(ACCEPTED).json({
status: getReasonPhrase(ACCEPTED), status: getReasonPhrase(ACCEPTED),
transactionId: transactionId, jobId: jobId,
timestamp: new Date().toISOString(), timestamp: new Date().toISOString(),
}); });
} catch (err) { } catch (err) {
logger.error( logger.error(
err, { err },
'Error processing update asset request for asset ID %s with transaction ID %s', 'Error processing update asset request for asset ID %s',
assetId, assetId
err.transactionId
); );
if (err instanceof AssetNotFoundError) {
return res.status(NOT_FOUND).json({
status: getReasonPhrase(NOT_FOUND),
timestamp: new Date().toISOString(),
});
}
return res.status(INTERNAL_SERVER_ERROR).json({ return res.status(INTERNAL_SERVER_ERROR).json({
status: getReasonPhrase(INTERNAL_SERVER_ERROR), status: getReasonPhrase(INTERNAL_SERVER_ERROR),
timestamp: new Date().toISOString(), timestamp: new Date().toISOString(),
@ -299,15 +280,13 @@ assetsRouter.patch(
} }
const mspId = req.user as string; const mspId = req.user as string;
const contract = req.app.get(mspId).assetContract as Contract;
const redis = req.app.get('redis') as Redis;
const assetId = req.params.assetId; const assetId = req.params.assetId;
const newOwner = req.body[0].value; const newOwner = req.body[0].value;
try { try {
const transactionId = await submitTransaction( const submitQueue = req.app.get('jobq') as Queue;
contract, const jobId = await addSubmitTransactionJob(
redis, submitQueue,
mspId, mspId,
'TransferAsset', 'TransferAsset',
assetId, assetId,
@ -316,24 +295,16 @@ assetsRouter.patch(
return res.status(ACCEPTED).json({ return res.status(ACCEPTED).json({
status: getReasonPhrase(ACCEPTED), status: getReasonPhrase(ACCEPTED),
transactionId: transactionId, jobId: jobId,
timestamp: new Date().toISOString(), timestamp: new Date().toISOString(),
}); });
} catch (err) { } catch (err) {
logger.error( logger.error(
err, { err },
'Error processing update asset request for asset ID %s with transaction ID %s', 'Error processing update asset request for asset ID %s',
req.params.assetId, req.params.assetId
err.transactionId
); );
if (err instanceof AssetNotFoundError) {
return res.status(NOT_FOUND).json({
status: getReasonPhrase(NOT_FOUND),
timestamp: new Date().toISOString(),
});
}
return res.status(INTERNAL_SERVER_ERROR).json({ return res.status(INTERNAL_SERVER_ERROR).json({
status: getReasonPhrase(INTERNAL_SERVER_ERROR), status: getReasonPhrase(INTERNAL_SERVER_ERROR),
timestamp: new Date().toISOString(), timestamp: new Date().toISOString(),
@ -346,14 +317,12 @@ assetsRouter.delete('/:assetId', async (req: Request, res: Response) => {
logger.debug(req.body, 'Delete asset request received'); logger.debug(req.body, 'Delete asset request received');
const mspId = req.user as string; const mspId = req.user as string;
const contract = req.app.get(mspId).assetContract as Contract;
const redis = req.app.get('redis') as Redis;
const assetId = req.params.assetId; const assetId = req.params.assetId;
try { try {
const transactionId = await submitTransaction( const submitQueue = req.app.get('jobq') as Queue;
contract, const jobId = await addSubmitTransactionJob(
redis, submitQueue,
mspId, mspId,
'DeleteAsset', 'DeleteAsset',
assetId assetId
@ -361,24 +330,16 @@ assetsRouter.delete('/:assetId', async (req: Request, res: Response) => {
return res.status(ACCEPTED).json({ return res.status(ACCEPTED).json({
status: getReasonPhrase(ACCEPTED), status: getReasonPhrase(ACCEPTED),
transactionId: transactionId, jobId: jobId,
timestamp: new Date().toISOString(), timestamp: new Date().toISOString(),
}); });
} catch (err) { } catch (err) {
logger.error( logger.error(
err, { err },
'Error processing delete asset request for asset ID %s with transaction ID %s', 'Error processing delete asset request for asset ID %s',
assetId, assetId
err.transactionId
); );
if (err instanceof AssetNotFoundError) {
return res.status(NOT_FOUND).json({
status: getReasonPhrase(NOT_FOUND),
timestamp: new Date().toISOString(),
});
}
return res.status(INTERNAL_SERVER_ERROR).json({ return res.status(INTERNAL_SERVER_ERROR).json({
status: getReasonPhrase(INTERNAL_SERVER_ERROR), status: getReasonPhrase(INTERNAL_SERVER_ERROR),
timestamp: new Date().toISOString(), timestamp: new Date().toISOString(),

View file

@ -60,46 +60,156 @@ describe('Config values', () => {
}); });
}); });
describe('retryDelay', () => { describe('submitJobBackoffType', () => {
it('defaults to "3000"', () => { it('defaults to "fixed"', () => {
const config = require('./config'); const config = require('./config');
expect(config.retryDelay).toBe(3000); expect(config.submitJobBackoffType).toBe('fixed');
}); });
it('can be configured using the "RETRY_DELAY" environment variable', () => { it('can be configured using the "SUBMIT_JOB_BACKOFF_TYPE" environment variable', () => {
process.env.RETRY_DELAY = '9999'; process.env.SUBMIT_JOB_BACKOFF_TYPE = 'exponential';
const config = require('./config'); const config = require('./config');
expect(config.retryDelay).toBe(9999); expect(config.submitJobBackoffType).toBe('exponential');
}); });
it('throws an error when the "RETRY_DELAY" environment variable has an invalid number', () => { it('throws an error when the "LOG_LEVEL" environment variable has an invalid log level', () => {
process.env.RETRY_DELAY = 'short'; process.env.SUBMIT_JOB_BACKOFF_TYPE = 'jitter';
expect(() => { expect(() => {
require('./config'); require('./config');
}).toThrow( }).toThrow(
'env-var: "RETRY_DELAY" should be a valid integer. An example of a valid value would be: 3000' 'env-var: "SUBMIT_JOB_BACKOFF_TYPE" should be one of [fixed, exponential]'
); );
}); });
}); });
describe('maxRetryCount', () => { describe('submitJobBackoffDelay', () => {
it('defaults to "5"', () => { it('defaults to "3000"', () => {
const config = require('./config'); const config = require('./config');
expect(config.maxRetryCount).toBe(5); expect(config.submitJobBackoffDelay).toBe(3000);
}); });
it('can be configured using the "MAX_RETRY_COUNT" environment variable', () => { it('can be configured using the "SUBMIT_JOB_BACKOFF_DELAY" environment variable', () => {
process.env.MAX_RETRY_COUNT = '9999'; process.env.SUBMIT_JOB_BACKOFF_DELAY = '9999';
const config = require('./config'); const config = require('./config');
expect(config.maxRetryCount).toBe(9999); expect(config.submitJobBackoffDelay).toBe(9999);
}); });
it('throws an error when the "MAX_RETRY_COUNT" environment variable has an invalid number', () => { it('throws an error when the "SUBMIT_JOB_BACKOFF_DELAY" environment variable has an invalid number', () => {
process.env.MAX_RETRY_COUNT = 'lots'; process.env.SUBMIT_JOB_BACKOFF_DELAY = 'short';
expect(() => { expect(() => {
require('./config'); require('./config');
}).toThrow( }).toThrow(
'env-var: "MAX_RETRY_COUNT" should be a valid integer. An example of a valid value would be: 5' 'env-var: "SUBMIT_JOB_BACKOFF_DELAY" should be a valid integer. An example of a valid value would be: 3000'
);
});
});
describe('submitJobAttempts', () => {
it('defaults to "5"', () => {
const config = require('./config');
expect(config.submitJobAttempts).toBe(5);
});
it('can be configured using the "SUBMIT_JOB_ATTEMPTS" environment variable', () => {
process.env.SUBMIT_JOB_ATTEMPTS = '9999';
const config = require('./config');
expect(config.submitJobAttempts).toBe(9999);
});
it('throws an error when the "SUBMIT_JOB_ATTEMPTS" environment variable has an invalid number', () => {
process.env.SUBMIT_JOB_ATTEMPTS = 'lots';
expect(() => {
require('./config');
}).toThrow(
'env-var: "SUBMIT_JOB_ATTEMPTS" should be a valid integer. An example of a valid value would be: 5'
);
});
});
describe('submitJobConcurrency', () => {
it('defaults to "5"', () => {
const config = require('./config');
expect(config.submitJobConcurrency).toBe(5);
});
it('can be configured using the "SUBMIT_JOB_CONCURRENCY" environment variable', () => {
process.env.SUBMIT_JOB_CONCURRENCY = '9999';
const config = require('./config');
expect(config.submitJobConcurrency).toBe(9999);
});
it('throws an error when the "SUBMIT_JOB_CONCURRENCY" environment variable has an invalid number', () => {
process.env.SUBMIT_JOB_CONCURRENCY = 'lots';
expect(() => {
require('./config');
}).toThrow(
'env-var: "SUBMIT_JOB_CONCURRENCY" should be a valid integer. An example of a valid value would be: 5'
);
});
});
describe('maxCompletedSubmitJobs', () => {
it('defaults to "1000"', () => {
const config = require('./config');
expect(config.maxCompletedSubmitJobs).toBe(1000);
});
it('can be configured using the "MAX_COMPLETED_SUBMIT_JOBS" environment variable', () => {
process.env.MAX_COMPLETED_SUBMIT_JOBS = '9999';
const config = require('./config');
expect(config.maxCompletedSubmitJobs).toBe(9999);
});
it('throws an error when the "MAX_COMPLETED_SUBMIT_JOBS" environment variable has an invalid number', () => {
process.env.MAX_COMPLETED_SUBMIT_JOBS = 'lots';
expect(() => {
require('./config');
}).toThrow(
'env-var: "MAX_COMPLETED_SUBMIT_JOBS" should be a valid integer. An example of a valid value would be: 1000'
);
});
});
describe('maxFailedSubmitJobs', () => {
it('defaults to "1000"', () => {
const config = require('./config');
expect(config.maxFailedSubmitJobs).toBe(1000);
});
it('can be configured using the "MAX_FAILED_SUBMIT_JOBS" environment variable', () => {
process.env.MAX_FAILED_SUBMIT_JOBS = '9999';
const config = require('./config');
expect(config.maxFailedSubmitJobs).toBe(9999);
});
it('throws an error when the "MAX_FAILED_SUBMIT_JOBS" environment variable has an invalid number', () => {
process.env.MAX_FAILED_SUBMIT_JOBS = 'lots';
expect(() => {
require('./config');
}).toThrow(
'env-var: "MAX_FAILED_SUBMIT_JOBS" should be a valid integer. An example of a valid value would be: 1000'
);
});
});
describe('submitJobQueueScheduler', () => {
it('defaults to "true"', () => {
const config = require('./config');
expect(config.submitJobQueueScheduler).toBe(true);
});
it('can be configured using the "SUBMIT_JOB_QUEUE_SCHEDULER" environment variable', () => {
process.env.SUBMIT_JOB_QUEUE_SCHEDULER = 'false';
const config = require('./config');
expect(config.submitJobQueueScheduler).toBe(false);
});
it('throws an error when the "SUBMIT_JOB_QUEUE_SCHEDULER" environment variable has an invalid boolean value', () => {
process.env.SUBMIT_JOB_QUEUE_SCHEDULER = '11';
expect(() => {
require('./config');
}).toThrow(
'env-var: "SUBMIT_JOB_QUEUE_SCHEDULER" should be either "true", "false", "TRUE", or "FALSE". An example of a valid value would be: true'
); );
}); });
}); });
@ -152,28 +262,6 @@ describe('Config values', () => {
}); });
}); });
describe('blockListenerOrg', () => {
it('defaults to "Org1"', () => {
const config = require('./config');
expect(config.blockListenerOrg).toBe('Org1');
});
it('can be configured using the "HLF_BLOCK_LISTENER_ORG" environment variable', () => {
process.env.HLF_BLOCK_LISTENER_ORG = 'Org2';
const config = require('./config');
expect(config.blockListenerOrg).toBe('Org2');
});
it('throws an error when the "HLF_BLOCK_LISTENER_ORG" environment variable has an invalid value', () => {
process.env.HLF_BLOCK_LISTENER_ORG = 'Org3';
expect(() => {
require('./config');
}).toThrow(
'env-var: "HLF_BLOCK_LISTENER_ORG" should be one of [Org1, Org2]'
);
});
});
describe('channelName', () => { describe('channelName', () => {
it('defaults to "mychannel"', () => { it('defaults to "mychannel"', () => {
const config = require('./config'); const config = require('./config');

View file

@ -7,6 +7,8 @@ import * as env from 'env-var';
export const ORG1 = 'Org1'; export const ORG1 = 'Org1';
export const ORG2 = 'Org2'; export const ORG2 = 'Org2';
export const JOB_QUEUE_NAME = 'submit';
/* /*
* Log level for the REST server * Log level for the REST server
*/ */
@ -25,23 +27,69 @@ export const port = env
.asPortNumber(); .asPortNumber();
/* /*
* The delay between each retry attempt in milliseconds * The type of backoff to use for retrying failed submit jobs
*/ */
export const retryDelay = env export const submitJobBackoffType = env
.get('RETRY_DELAY') .get('SUBMIT_JOB_BACKOFF_TYPE')
.default('fixed')
.asEnum(['fixed', 'exponential']);
/*
* Backoff delay for retrying failed submit jobs in milliseconds
*/
export const submitJobBackoffDelay = env
.get('SUBMIT_JOB_BACKOFF_DELAY')
.default('3000') .default('3000')
.example('3000') .example('3000')
.asIntPositive(); .asIntPositive();
/* /*
* The maximum number of times to retry a failing transaction * The total number of attempts to try a submit job until it completes
*/ */
export const maxRetryCount = env export const submitJobAttempts = env
.get('MAX_RETRY_COUNT') .get('SUBMIT_JOB_ATTEMPTS')
.default('5') .default('5')
.example('5') .example('5')
.asIntPositive(); .asIntPositive();
/*
* The maximum number of submit jobs that can be processed in parallel
*/
export const submitJobConcurrency = env
.get('SUBMIT_JOB_CONCURRENCY')
.default('5')
.example('5')
.asIntPositive();
/*
* The number of completed submit jobs to keep
*/
export const maxCompletedSubmitJobs = env
.get('MAX_COMPLETED_SUBMIT_JOBS')
.default('1000')
.example('1000')
.asIntPositive();
/*
* The number of failed submit jobs to keep
*/
export const maxFailedSubmitJobs = env
.get('MAX_FAILED_SUBMIT_JOBS')
.default('1000')
.example('1000')
.asIntPositive();
/*
* Whether to initialise a scheduler for the submit job queue
* There must be at least on queue scheduler to handle retries and you may want
* more than one for redundancy
*/
export const submitJobQueueScheduler = env
.get('SUBMIT_JOB_QUEUE_SCHEDULER')
.default('true')
.example('true')
.asBoolStrict();
/* /*
* Whether to convert discovered host addresses to be 'localhost' * Whether to convert discovered host addresses to be 'localhost'
* This should be set to 'true' when running a docker composed fabric network on the * This should be set to 'true' when running a docker composed fabric network on the
@ -71,14 +119,6 @@ export const mspIdOrg2 = env
.example(`${ORG2}MSP`) .example(`${ORG2}MSP`)
.asString(); .asString();
/*
* The block listener org
*/
export const blockListenerOrg = env
.get('HLF_BLOCK_LISTENER_ORG')
.default(ORG1)
.asEnum([ORG1, ORG2]);
/* /*
* Name of the channel which the basic asset sample chaincode has been installed on * Name of the channel which the basic asset sample chaincode has been installed on
*/ */
@ -205,7 +245,7 @@ export const redisPort = env
*/ */
export const redisUsername = env export const redisUsername = env
.get('REDIS_USERNAME') .get('REDIS_USERNAME')
.example('conga') .example('fabric')
.asString(); .asString();
/* /*

View file

@ -5,15 +5,55 @@
import { import {
AssetExistsError, AssetExistsError,
AssetNotFoundError, AssetNotFoundError,
TransactionError,
TransactionNotFoundError, TransactionNotFoundError,
handleError, handleError,
isDuplicateTransactionError, isDuplicateTransactionError,
isErrorLike,
} from './errors'; } from './errors';
describe('Errors', () => { describe('Errors', () => {
describe('isErrorLike', () => {
it('returns false for null', () => {
expect(isErrorLike(null)).toBe(false);
});
it('returns false for undefined', () => {
expect(isErrorLike(undefined)).toBe(false);
});
it('returns false for empty object', () => {
expect(isErrorLike({})).toBe(false);
});
it('returns false for string', () => {
expect(isErrorLike('true')).toBe(false);
});
it('returns false for non-error object', () => {
expect(isErrorLike({ size: 42 })).toBe(false);
});
it('returns false for invalid error object', () => {
expect(isErrorLike({ name: 'MockError', message: 42 })).toBe(false);
});
it('returns false for error like object with invalid stack', () => {
expect(
isErrorLike({ name: 'MockError', message: 'Fail', stack: false })
).toBe(false);
});
it('returns true for error like object', () => {
expect(isErrorLike({ name: 'MockError', message: 'Fail' })).toBe(true);
});
it('returns true for new Error', () => {
expect(isErrorLike(new Error('Error'))).toBe(true);
});
});
describe('isDuplicateTransactionError', () => { describe('isDuplicateTransactionError', () => {
it('returns true for an error with duplicate transaction endorsement details', () => { it('returns true for an error when all endorsement details are duplicate transaction found', () => {
const mockDuplicateTransactionError = { const mockDuplicateTransactionError = {
errors: [ errors: [
{ {
@ -21,6 +61,36 @@ describe('Errors', () => {
{ {
details: 'duplicate transaction found', details: 'duplicate transaction found',
}, },
{
details: 'duplicate transaction found',
},
{
details: 'duplicate transaction found',
},
],
},
],
};
expect(isDuplicateTransactionError(mockDuplicateTransactionError)).toBe(
true
);
});
it('returns true for an error when at least one endorsement details are duplicate transaction found', () => {
const mockDuplicateTransactionError = {
errors: [
{
endorsements: [
{
details: 'duplicate transaction found',
},
{
details: 'mock endorsement details',
},
{
details: 'mock endorsement details',
},
], ],
}, },
], ],
@ -39,6 +109,9 @@ describe('Errors', () => {
{ {
details: 'mock endorsement details', details: 'mock endorsement details',
}, },
{
details: 'mock endorsement details',
},
], ],
}, },
], ],
@ -48,6 +121,38 @@ describe('Errors', () => {
false false
); );
}); });
it('returns false for an error without endorsement details', () => {
const mockDuplicateTransactionError = {
errors: [
{
rejections: [
{
details: 'duplicate transaction found',
},
],
},
],
};
expect(isDuplicateTransactionError(mockDuplicateTransactionError)).toBe(
false
);
});
it('returns false for a basic Error object without endorsement details', () => {
expect(
isDuplicateTransactionError(new Error('duplicate transaction found'))
).toBe(false);
});
it('returns false for an undefined error', () => {
expect(isDuplicateTransactionError(undefined)).toBe(false);
});
it('returns false for a null error', () => {
expect(isDuplicateTransactionError(null)).toBe(false);
});
}); });
describe('handleError', () => { describe('handleError', () => {
@ -77,25 +182,27 @@ describe('Errors', () => {
} }
); );
it('returns a TransactionNotFoundError for errors with a transaction not found message', () => { it.each([
expect( 'Failed to get transaction with id txn, error Entry not found in index',
handleError( 'Failed to get transaction with id txn, error no such transaction ID [txn] in index',
'txn1', ])(
new Error( 'returns a TransactionNotFoundError for errors with a transaction not found message: %s',
'Failed to get transaction with id txn, error Entry not found in index' (msg) => {
) expect(handleError('txn1', new Error(msg))).toStrictEqual(
) new TransactionNotFoundError(msg, 'txn1')
).toStrictEqual( );
new TransactionNotFoundError( }
'Failed to get transaction with id txn, error Entry not found in index', );
'txn1'
) it('returns the original error for errors with other messages', () => {
expect(handleError('txn1', new Error('MOCK ERROR'))).toStrictEqual(
new Error('MOCK ERROR')
); );
}); });
it('returns a TransactionError for errors with other messages', () => { it('returns a new Error object for errors of other types', () => {
expect(handleError('txn1', new Error('MOCK ERROR'))).toStrictEqual( expect(handleError('txn1', 42)).toStrictEqual(
new TransactionError('Transaction error', 'txn1') new Error('Unhandled error: 42')
); );
}); });
}); });

View file

@ -4,23 +4,23 @@
import { logger } from './logger'; import { logger } from './logger';
export class TransactionError extends Error { export class ContractError extends Error {
transactionId: string; transactionId: string;
constructor(message: string, transactionId: string) { constructor(message: string, transactionId: string) {
super(message); super(message);
Object.setPrototypeOf(this, TransactionError.prototype); Object.setPrototypeOf(this, ContractError.prototype);
this.name = 'TransactionError'; this.name = 'TransactionError';
this.transactionId = transactionId; this.transactionId = transactionId;
} }
} }
export class TransactionNotFoundError extends Error { export class TransactionNotFoundError extends ContractError {
transactionId: string; transactionId: string;
constructor(message: string, transactionId: string) { constructor(message: string, transactionId: string) {
super(message); super(message, transactionId);
Object.setPrototypeOf(this, TransactionNotFoundError.prototype); Object.setPrototypeOf(this, TransactionNotFoundError.prototype);
this.name = 'TransactionNotFoundError'; this.name = 'TransactionNotFoundError';
@ -28,7 +28,7 @@ export class TransactionNotFoundError extends Error {
} }
} }
export class AssetExistsError extends TransactionError { export class AssetExistsError extends ContractError {
constructor(message: string, transactionId: string) { constructor(message: string, transactionId: string) {
super(message, transactionId); super(message, transactionId);
Object.setPrototypeOf(this, AssetExistsError.prototype); Object.setPrototypeOf(this, AssetExistsError.prototype);
@ -37,7 +37,7 @@ export class AssetExistsError extends TransactionError {
} }
} }
export class AssetNotFoundError extends TransactionError { export class AssetNotFoundError extends ContractError {
constructor(message: string, transactionId: string) { constructor(message: string, transactionId: string) {
super(message, transactionId); super(message, transactionId);
Object.setPrototypeOf(this, AssetNotFoundError.prototype); Object.setPrototypeOf(this, AssetNotFoundError.prototype);
@ -46,6 +46,29 @@ export class AssetNotFoundError extends TransactionError {
} }
} }
export class JobNotFoundError extends Error {
jobId: string;
constructor(message: string, jobId: string) {
super(message);
Object.setPrototypeOf(this, JobNotFoundError.prototype);
this.name = 'JobNotFoundError';
this.jobId = jobId;
}
}
export const isErrorLike = (err: unknown): err is Error => {
return (
err != undefined &&
err != null &&
typeof (err as Error).name === 'string' &&
typeof (err as Error).message === 'string' &&
((err as Error).stack === undefined ||
typeof (err as Error).stack === 'string')
);
};
/* /*
* Checks whether an error was caused by a duplicate transaction. * Checks whether an error was caused by a duplicate transaction.
* *
@ -54,19 +77,103 @@ export class AssetNotFoundError extends TransactionError {
* DUPLICATE_TXID TxValidationCode somehow but that does not seem to be * DUPLICATE_TXID TxValidationCode somehow but that does not seem to be
* possible. * possible.
*/ */
export const isDuplicateTransactionError = (error: { export const isDuplicateTransactionError = (err: unknown): boolean => {
errors: { endorsements: { details: string }[] }[]; if (err === undefined || err === null) return false;
}): boolean => {
try {
const isDuplicateTxn = error?.errors?.some((err) =>
err?.endorsements?.some((endorsement) =>
endorsement?.details?.startsWith('duplicate transaction found')
)
);
return isDuplicateTxn; const endorsementError = err as {
} catch (err) { errors: { endorsements: { details: string }[] }[];
logger.warn(err, 'Error checking for duplicate transaction'); };
const isDuplicate = endorsementError?.errors?.some((err) =>
err?.endorsements?.some((endorsement) =>
endorsement?.details?.startsWith('duplicate transaction found')
)
);
return isDuplicate === true;
};
/*
* Matches asset already exists error strings from the asset contract
*
* The regex needs to match the following error messages:
* "the asset %s already exists"
* "The asset ${id} already exists"
* "Asset %s already exists"
*/
const matchAssetAlreadyExistsMessage = (message: string): string | null => {
//
const assetAlreadyExistsRegex = /([tT]he )?[aA]sset \w* already exists/g;
const assetAlreadyExistsMatch = message.match(assetAlreadyExistsRegex);
logger.debug(
{ message: message, result: assetAlreadyExistsMatch },
'Checking for asset already exists message'
);
if (assetAlreadyExistsMatch !== null) {
return assetAlreadyExistsMatch[0];
}
return null;
};
/*
* Matches asset does not exist error strings from the asset contract
*
* The regex needs to match the following error messages:
* "the asset %s does not exist"
* "The asset ${id} does not exist"
* "Asset %s does not exist"
*/
const matchAssetDoesNotExistMessage = (message: string): string | null => {
const assetDoesNotExistRegex = /([tT]he )?[aA]sset \w* does not exist/g;
const assetDoesNotExistMatch = message.match(assetDoesNotExistRegex);
logger.debug(
{ message: message, result: assetDoesNotExistMatch },
'Checking for asset does not exist message'
);
if (assetDoesNotExistMatch !== null) {
return assetDoesNotExistMatch[0];
}
return null;
};
/*
* Matches transaction does not exist error strings from the contract API
*
* The regex needs to match the following error messages:
* "Failed to get transaction with id %s, error Entry not found in index"
* "Failed to get transaction with id %s, error no such transaction ID [%s] in index"
*/
const matchTransactionDoesNotExistMessage = (
message: string
): string | null => {
const transactionDoesNotExistRegex =
/Failed to get transaction with id [^,]*, error (?:(?:Entry not found)|(?:no such transaction ID \[[^\]]*\])) in index/g;
const transactionDoesNotExistMatch = message.match(
transactionDoesNotExistRegex
);
logger.debug(
{ message: message, result: transactionDoesNotExistMatch },
'Checking for transaction does not exist message'
);
if (transactionDoesNotExistMatch !== null) {
return transactionDoesNotExistMatch[0];
}
return null;
};
export const isContractError = (err: unknown): boolean => {
if (
err instanceof AssetExistsError ||
err instanceof AssetNotFoundError ||
err instanceof TransactionNotFoundError
) {
return true;
} }
return false; return false;
@ -80,56 +187,32 @@ export const isDuplicateTransactionError = (error: {
* again it's the only option. The error message text is not even the same for * again it's the only option. The error message text is not even the same for
* the Go, Java, and Javascript implementations of the chaincode! * the Go, Java, and Javascript implementations of the chaincode!
*/ */
export const handleError = (transactionId: string, err: Error): Error => { export const handleError = (transactionId: string, err: unknown): Error => {
// This regex needs to match the following error messages: logger.debug({ transactionId: transactionId, err }, 'Processing error');
// "the asset %s already exists"
// "The asset ${id} already exists"
// "Asset %s already exists"
const assetAlreadyExistsRegex = /([tT]he )?[aA]sset \w* already exists/g;
const assetAlreadyExistsMatch = err.message.match(assetAlreadyExistsRegex);
logger.debug(
{ message: err.message, result: assetAlreadyExistsMatch },
'Checking for asset already exists message'
);
if (assetAlreadyExistsMatch) {
return new AssetExistsError(assetAlreadyExistsMatch[0], transactionId);
}
// This regex needs to match the following error messages: if (isErrorLike(err)) {
// "the asset %s does not exist" const assetAlreadyExistsMatch = matchAssetAlreadyExistsMessage(err.message);
// "The asset ${id} does not exist" if (assetAlreadyExistsMatch !== null) {
// "Asset %s does not exist" return new AssetExistsError(assetAlreadyExistsMatch, transactionId);
const assetDoesNotExistRegex = /([tT]he )?[aA]sset \w* does not exist/g; }
const assetDoesNotExistMatch = err.message.match(assetDoesNotExistRegex);
logger.debug(
{ message: err.message, result: assetDoesNotExistMatch },
'Checking for asset does not exist message'
);
if (assetDoesNotExistMatch) {
return new AssetNotFoundError(assetDoesNotExistMatch[0], transactionId);
}
// This regex needs to match the following error messages: const assetDoesNotExistMatch = matchAssetDoesNotExistMessage(err.message);
// "Failed to get transaction with id %s, error Entry not found in index" if (assetDoesNotExistMatch !== null) {
const transactionDoesNotExistRegex = return new AssetNotFoundError(assetDoesNotExistMatch, transactionId);
/Failed to get transaction with id [^,]*, error Entry not found in index/g; }
const transactionDoesNotExistMatch = err.message.match(
transactionDoesNotExistRegex const transactionDoesNotExistMatch = matchTransactionDoesNotExistMessage(
); err.message
logger.debug(
{ message: err.message, result: transactionDoesNotExistMatch },
'Checking for transaction does not exist message'
);
if (transactionDoesNotExistMatch) {
return new TransactionNotFoundError(
transactionDoesNotExistMatch[0],
transactionId
); );
if (transactionDoesNotExistMatch !== null) {
return new TransactionNotFoundError(
transactionDoesNotExistMatch,
transactionId
);
}
return err;
} }
logger.error( return new Error(`Unhandled error: ${err}`);
{ transactionId: transactionId, error: err },
'Unhandled transaction error'
);
return new TransactionError('Transaction error', transactionId);
}; };

View file

@ -10,69 +10,49 @@ import {
evatuateTransaction, evatuateTransaction,
submitTransaction, submitTransaction,
getBlockHeight, getBlockHeight,
startRetryLoop, getTransactionValidationCode,
blockEventHandler, processSubmitTransactionJob,
} from './fabric'; } from './fabric';
import * as config from './config'; import * as config from './config';
import { import {
AssetExistsError, AssetExistsError,
AssetNotFoundError, AssetNotFoundError,
TransactionError,
TransactionNotFoundError, TransactionNotFoundError,
} from './errors'; } from './errors';
import { import {
BlockEvent,
Contract, Contract,
Gateway, Gateway,
GatewayOptions, GatewayOptions,
Network, Network,
Transaction, Transaction,
TransactionEvent,
Wallet, Wallet,
} from 'fabric-network'; } from 'fabric-network';
import * as fabricProtos from 'fabric-protos'; import * as fabricProtos from 'fabric-protos';
import { MockProxy, mock } from 'jest-mock-extended'; import { MockProxy, mock } from 'jest-mock-extended';
import IORedis, { Redis } from 'ioredis';
import Long from 'long'; import Long from 'long';
import { Job } from 'bullmq';
jest.mock('./config'); jest.mock('./config');
jest.mock('fabric-network', () => {
type FabricNetworkModule = jest.Mocked<typeof import('fabric-network')>;
const originalModule: FabricNetworkModule =
jest.requireActual('fabric-network');
const mockModule: FabricNetworkModule =
jest.createMockFromModule('fabric-network');
return {
__esModule: true,
...mockModule,
Wallets: originalModule.Wallets,
};
});
jest.mock('ioredis', () => require('ioredis-mock/jest')); jest.mock('ioredis', () => require('ioredis-mock/jest'));
describe('Fabric', () => { describe('Fabric', () => {
const mockTransactionId =
'0ae62c01e4c4b112c3f3954a2f11243da76778e46df9ad2783bcbafc79652b95';
const mockKey = `txn:${mockTransactionId}`;
const mockMspId = 'Org1MSP';
const mockState = Buffer.from(
`{"name":"CreateAsset","nonce":"damqinq8nrI4n4qY8lFVsZw7RwG2ufrv","transactionId":${mockTransactionId}`
);
const mockArgs = '["test111","red",400,"Jean",101]';
const mockTimestamp = 1628078044362;
const addMockTransationDetails = async (redis: Redis) => {
await redis
.multi()
.hset(
mockKey,
'mspId',
mockMspId,
'state',
mockState,
'args',
mockArgs,
'timestamp',
mockTimestamp,
'retries',
'0'
)
.zadd('index:txn:timestamp', mockTimestamp, mockTransactionId)
.exec();
};
describe('createWallet', () => { describe('createWallet', () => {
it('creates a wallet containing identities for both orgs', async () => { it('creates a wallet containing identities for both orgs', async () => {
const wallet = await createWallet(); const wallet = await createWallet();
@ -137,162 +117,130 @@ describe('Fabric', () => {
}); });
}); });
describe('startRetryLoop', () => { describe('processSubmitTransactionJob', () => {
let redis: Redis; const mockContracts = new Map<string, Contract>();
const mockPayload = Buffer.from('MOCK PAYLOAD');
const mockSavedState = Buffer.from('MOCK SAVED STATE');
let mockTransaction: MockProxy<Transaction>; let mockTransaction: MockProxy<Transaction>;
let mockContract: MockProxy<Contract>; let mockContract: MockProxy<Contract>;
let mockContracts: Map<string, Contract>; let mockJob: MockProxy<Job>;
const flushPromises = () => {
jest.useRealTimers();
return new Promise((resolve) => setImmediate(resolve));
};
beforeEach(() => { beforeEach(() => {
const redisOptions = { mockTransaction = mock<Transaction>();
port: config.redisPort, mockTransaction.getTransactionId.mockReturnValue('mockTransactionId');
host: config.redisHost,
username: config.redisUsername, mockContract = mock<Contract>();
password: config.redisPassword, mockContract.createTransaction
.calledWith('txn')
.mockReturnValue(mockTransaction);
mockContract.deserializeTransaction
.calledWith(mockSavedState)
.mockReturnValue(mockTransaction);
mockContracts.set('mockMspid', mockContract);
mockJob = mock<Job>();
});
it('gets job result with no error or payload if no contract is available for the required mspid', async () => {
mockJob.data = {
mspid: 'missingMspid',
}; };
redis = new IORedis(redisOptions) as unknown as Redis; const jobResult = await processSubmitTransactionJob(
mockContracts,
mockJob
);
mockTransaction = mock<Transaction>(); expect(jobResult).toStrictEqual({
transactionError: undefined,
transactionPayload: undefined,
});
});
it('gets a job result containing a payload if the transaction was successful first time', async () => {
mockJob.data = {
mspid: 'mockMspid',
transactionName: 'txn',
transactionArgs: ['arg1', 'arg2'],
};
mockTransaction.submit mockTransaction.submit
.mockResolvedValue(Buffer.from('MOCK PAYLOAD')) .calledWith('arg1', 'arg2')
.mockName('submit'); .mockResolvedValue(mockPayload);
mockContract = mock<Contract>();
mockContract.deserializeTransaction.mockReturnValue(mockTransaction);
mockContracts = new Map<string, Contract>();
mockContracts.set(mockMspId, mockContract);
jest.useFakeTimers(); const jobResult = await processSubmitTransactionJob(
}); mockContracts,
mockJob
afterEach(() => {
jest.useRealTimers();
});
it('starts a retry loop which does nothing if there are no saved transaction details', async () => {
const getContractSpy = jest.spyOn(mockContracts, 'get');
startRetryLoop(mockContracts, redis);
jest.runOnlyPendingTimers();
await flushPromises();
expect(getContractSpy).not.toBeCalled();
});
it('starts a retry loop which clears the saved details after succesfully retrying a transaction', async () => {
addMockTransationDetails(redis);
startRetryLoop(mockContracts, redis);
jest.runOnlyPendingTimers();
await flushPromises();
expect(mockContract.deserializeTransaction).toBeCalledWith(mockState);
expect(mockTransaction.submit).toBeCalledWith(
'test111',
'red',
400,
'Jean',
101
); );
const index = await redis.zrange('index:txn:timestamp', 0, -1); expect(jobResult).toStrictEqual({
expect(index).toStrictEqual([]); transactionError: undefined,
transactionPayload: Buffer.from('MOCK PAYLOAD'),
});
}); });
it('starts a retry loop which increments the retry count when a transaction fails', async () => { it('gets a job result containing a payload if the transaction was successfully rerun using saved transaction state', async () => {
addMockTransationDetails(redis); mockJob.data = {
mockTransaction.submit.mockRejectedValue(new Error('MOCK ERROR')); mspid: 'mockMspid',
transactionName: 'txn',
transactionArgs: ['arg1', 'arg2'],
transactionState: mockSavedState,
};
mockTransaction.submit
.calledWith('arg1', 'arg2')
.mockResolvedValue(mockPayload);
startRetryLoop(mockContracts, redis); const jobResult = await processSubmitTransactionJob(
jest.runOnlyPendingTimers(); mockContracts,
await flushPromises(); mockJob
expect(mockContract.deserializeTransaction).toBeCalledWith(mockState);
expect(mockTransaction.submit).toBeCalledWith(
'test111',
'red',
400,
'Jean',
101
); );
const index = await redis.zrange('index:txn:timestamp', 0, -1); expect(jobResult).toStrictEqual({
expect(index).toStrictEqual([ transactionError: undefined,
'0ae62c01e4c4b112c3f3954a2f11243da76778e46df9ad2783bcbafc79652b95', transactionPayload: Buffer.from('MOCK PAYLOAD'),
]); });
const savedTransaction = await (redis as Redis).hgetall(mockKey);
expect(savedTransaction.retries).toBe('1');
}); });
it('starts a retry loop which clears the saved details when a transaction fails as a duplicate', async () => { it('gets a job result containing an error message if the transaction fails but cannot be retried', async () => {
addMockTransationDetails(redis); mockJob.data = {
const mockDuplicateTransactionError = new Error('MOCK ERROR'); mspid: 'mockMspid',
// eslint-disable-next-line @typescript-eslint/no-explicit-any transactionName: 'txn',
(mockDuplicateTransactionError as any).errors = [ transactionArgs: ['arg1', 'arg2'],
{ transactionState: mockSavedState,
endorsements: [ };
{ mockTransaction.submit
details: 'duplicate transaction found', .calledWith('arg1', 'arg2')
}, .mockRejectedValue(
], new Error(
}, 'Failed to get transaction with id txn, error Entry not found in index'
]; )
mockTransaction.submit.mockRejectedValue(mockDuplicateTransactionError); );
startRetryLoop(mockContracts, redis); const jobResult = await processSubmitTransactionJob(
jest.runOnlyPendingTimers(); mockContracts,
await flushPromises(); mockJob
expect(mockContract.deserializeTransaction).toBeCalledWith(mockState);
expect(mockTransaction.submit).toBeCalledWith(
'test111',
'red',
400,
'Jean',
101
); );
const index = await redis.zrange('index:txn:timestamp', 0, -1); expect(jobResult).toStrictEqual({
expect(index).toStrictEqual([]); transactionError:
'TransactionNotFoundError: Failed to get transaction with id txn, error Entry not found in index',
transactionPayload: undefined,
});
}); });
it('starts a retry loop which clears the saved details when a transaction fails the final attempt', async () => { it('throws an error if the transaction fails but can be retried', async () => {
addMockTransationDetails(redis); mockJob.data = {
await (redis as Redis).hincrby(mockKey, 'retries', 5); mspid: 'mockMspid',
mockTransaction.submit.mockRejectedValue(new Error('MOCK ERROR')); transactionName: 'txn',
transactionArgs: ['arg1', 'arg2'],
transactionState: mockSavedState,
};
mockTransaction.submit
.calledWith('arg1', 'arg2')
.mockRejectedValue(new Error('MOCK ERROR'));
startRetryLoop(mockContracts, redis); await expect(async () => {
jest.runOnlyPendingTimers(); await processSubmitTransactionJob(mockContracts, mockJob);
await flushPromises(); }).rejects.toThrow('MOCK ERROR');
expect(mockContract.deserializeTransaction).toBeCalledWith(mockState);
expect(mockTransaction.submit).toBeCalledWith(
'test111',
'red',
400,
'Jean',
101
);
const index = await redis.zrange('index:txn:timestamp', 0, -1);
expect(index).toStrictEqual([]);
});
it('starts a retry loop which clears the saved details when no contract exist for the org', async () => {
addMockTransationDetails(redis);
mockContracts = new Map<string, Contract>();
startRetryLoop(mockContracts, redis);
jest.runOnlyPendingTimers();
await flushPromises();
const index = await redis.zrange('index:txn:timestamp', 0, -1);
expect(index).toStrictEqual([]);
}); });
}); });
@ -352,96 +300,65 @@ describe('Fabric', () => {
}).rejects.toThrow(TransactionNotFoundError); }).rejects.toThrow(TransactionNotFoundError);
}); });
it('throws a TransactionError for other errors', async () => { it('throws an Error for other errors', async () => {
mockTransaction.evaluate.mockRejectedValue(new Error('MOCK ERROR')); mockTransaction.evaluate.mockRejectedValue(new Error('MOCK ERROR'));
await expect(async () => { await expect(async () => {
await evatuateTransaction(mockContract, 'txn', 'arga', 'argb'); await evatuateTransaction(mockContract, 'txn', 'arga', 'argb');
}).rejects.toThrow(TransactionError); }).rejects.toThrow(Error);
}); });
}); });
describe('submitTransaction', () => { describe('submitTransaction', () => {
let redis: Redis;
const mockPayload = Buffer.from('MOCK PAYLOAD');
let mockTransaction: MockProxy<Transaction>; let mockTransaction: MockProxy<Transaction>;
let mockContract: MockProxy<Contract>;
beforeEach(async () => {
const redisOptions = {
port: config.redisPort,
host: config.redisHost,
username: config.redisUsername,
password: config.redisPassword,
};
redis = new IORedis(redisOptions) as unknown as Redis;
beforeEach(() => {
mockTransaction = mock<Transaction>(); mockTransaction = mock<Transaction>();
mockTransaction.submit.mockResolvedValue(mockPayload);
mockTransaction.getTransactionId.mockReturnValue('MOCK TXN ID');
mockTransaction.serialize.mockReturnValue(Buffer.from('MOCK TXN STATE'));
mockContract = mock<Contract>();
mockContract.createTransaction
.calledWith('txn')
.mockReturnValue(mockTransaction);
}); });
it('gets the transaction ID of the submitted transaction', async () => { it('gets the result of submitting a transaction', async () => {
const mockPayload = Buffer.from('MOCK PAYLOAD');
mockTransaction.submit.mockResolvedValue(mockPayload);
const result = await submitTransaction( const result = await submitTransaction(
mockContract, mockTransaction,
redis,
'mspid',
'txn', 'txn',
'arga', 'arga',
'argb' 'argb'
); );
expect(result).toBe('MOCK TXN ID'); expect(result.toString()).toBe(mockPayload.toString());
}); });
it.each([ it('throws an AssetExistsError an asset already exists error occurs', async () => {
'the asset GOCHAINCODE already exists', mockTransaction.submit.mockRejectedValue(
'Asset JAVACHAINCODE already exists', new Error('The asset JSCHAINCODE already exists')
'The asset JSCHAINCODE already exists', );
])(
'throws an AssetExistsError an asset already exists error occurs: %s',
async (msg) => {
mockTransaction.submit.mockRejectedValue(new Error(msg));
await expect(async () => { await expect(async () => {
await submitTransaction( await submitTransaction(
mockContract, mockTransaction,
redis, 'mspid',
'mspid', 'txn',
'txn', 'arga',
'arga', 'argb'
'argb' );
); }).rejects.toThrow(AssetExistsError);
}).rejects.toThrow(AssetExistsError); });
}
);
it.each([ it('throws an AssetNotFoundError if an asset does not exist error occurs', async () => {
'the asset GOCHAINCODE does not exist', mockTransaction.submit.mockRejectedValue(
'Asset JAVACHAINCODE does not exist', new Error('The asset JSCHAINCODE does not exist')
'The asset JSCHAINCODE does not exist', );
])(
'throws an AssetNotFoundError if an asset does not exist error occurs: %s',
async (msg) => {
mockTransaction.submit.mockRejectedValue(new Error(msg));
await expect(async () => { await expect(async () => {
await submitTransaction( await submitTransaction(
mockContract, mockTransaction,
redis, 'mspid',
'mspid', 'txn',
'txn', 'arga',
'arga', 'argb'
'argb' );
); }).rejects.toThrow(AssetNotFoundError);
}).rejects.toThrow(AssetNotFoundError); });
}
);
it('throws a TransactionNotFoundError if a transaction not found error occurs', async () => { it('throws a TransactionNotFoundError if a transaction not found error occurs', async () => {
mockTransaction.submit.mockRejectedValue( mockTransaction.submit.mockRejectedValue(
@ -452,8 +369,7 @@ describe('Fabric', () => {
await expect(async () => { await expect(async () => {
await submitTransaction( await submitTransaction(
mockContract, mockTransaction,
redis,
'mspid', 'mspid',
'txn', 'txn',
'arga', 'arga',
@ -462,76 +378,42 @@ describe('Fabric', () => {
}).rejects.toThrow(TransactionNotFoundError); }).rejects.toThrow(TransactionNotFoundError);
}); });
it('throws a TransactionError for other errors', async () => { it('throws an Error for other errors', async () => {
mockTransaction.submit.mockRejectedValue(new Error('MOCK ERROR')); mockTransaction.submit.mockRejectedValue(new Error('MOCK ERROR'));
await expect(async () => { await expect(async () => {
await submitTransaction( await submitTransaction(
mockContract, mockTransaction,
redis,
'mspid', 'mspid',
'txn', 'txn',
'arga', 'arga',
'argb' 'argb'
); );
}).rejects.toThrow(TransactionError); }).rejects.toThrow(Error);
}); });
}); });
describe('blockEventHandler', () => { describe('getTransactionValidationCode', () => {
let redis: Redis; it('gets the validation code from a processed transaction', async () => {
let mockIsValidGetter: jest.Mock<boolean, []>; const processedTransactionProto =
let mockTransactionIdGetter: jest.Mock<string, []>; fabricProtos.protos.ProcessedTransaction.create();
let mockTransactionEvent: MockProxy<TransactionEvent>; processedTransactionProto.validationCode =
let mockBlockEvent: MockProxy<BlockEvent>; fabricProtos.protos.TxValidationCode.VALID;
const processedTransactionBuffer = Buffer.from(
fabricProtos.protos.ProcessedTransaction.encode(
processedTransactionProto
).finish()
);
beforeEach(async () => { const mockTransaction = mock<Transaction>();
const redisOptions = { mockTransaction.evaluate.mockResolvedValue(processedTransactionBuffer);
port: config.redisPort, const mockContract = mock<Contract>();
host: config.redisHost, mockContract.createTransaction
username: config.redisUsername, .calledWith('GetTransactionByID')
password: config.redisPassword, .mockReturnValue(mockTransaction);
}; expect(await getTransactionValidationCode(mockContract, 'txn1')).toBe(
'VALID'
redis = new IORedis(redisOptions) as unknown as Redis; );
addMockTransationDetails(redis);
const baseMock = {};
mockTransactionEvent = mock<TransactionEvent>(baseMock);
mockIsValidGetter = jest.fn<boolean, []>();
Object.defineProperty(baseMock, 'isValid', { get: mockIsValidGetter });
mockTransactionIdGetter = jest.fn<string, []>();
Object.defineProperty(baseMock, 'transactionId', {
get: mockTransactionIdGetter,
});
mockBlockEvent = mock<BlockEvent>();
mockBlockEvent.getTransactionEvents.mockReturnValue([
mockTransactionEvent,
]);
});
it('clears saved details for valid transactions', async () => {
const blockListener = blockEventHandler(redis);
mockIsValidGetter.mockReturnValue(true);
mockTransactionIdGetter.mockReturnValue(mockTransactionId);
await blockListener(mockBlockEvent);
const index = await redis.zrange('index:txn:timestamp', 0, -1);
expect(index).toStrictEqual([]);
});
it('does not clear saved details for invalid transactions', async () => {
const blockListener = blockEventHandler(redis);
mockIsValidGetter.mockReturnValue(false);
await blockListener(mockBlockEvent);
const index = await redis.zrange('index:txn:timestamp', 0, -1);
expect(index).toStrictEqual([
'0ae62c01e4c4b112c3f3954a2f11243da76778e46df9ad2783bcbafc79652b95',
]);
}); });
}); });

View file

@ -10,23 +10,20 @@ import {
GatewayOptions, GatewayOptions,
Wallets, Wallets,
Network, Network,
BlockListener, TimeoutError,
BlockEvent, Transaction,
TransactionEvent,
Wallet, Wallet,
} from 'fabric-network'; } from 'fabric-network';
import { Redis } from 'ioredis';
import * as config from './config'; import * as config from './config';
import { logger } from './logger'; import { logger } from './logger';
import { import {
storeTransactionDetails, handleError,
getRetryTransactionDetails, isContractError,
clearTransactionDetails, isDuplicateTransactionError,
incrementRetryCount, } from './errors';
TransactionDetails, import * as protos from 'fabric-protos';
} from './redis'; import { Job } from 'bullmq';
import { handleError, isDuplicateTransactionError } from './errors'; import { JobData, JobResult, updateJobData } from './jobs';
import protos from 'fabric-protos';
/* /*
* Creates an in memory wallet to hold credentials for an Org1 and Org2 user * Creates an in memory wallet to hold credentials for an Org1 and Org2 user
@ -124,55 +121,119 @@ export const getContracts = async (
}; };
/* /*
* Starts a timer to retry transactions at regular intervals * Process a submit transaction request from the job queue
* *
* Note: there is check for whether the transaction has successfully completed * For this sample transactions are retried if they fail with any error,
* since it could succeed between any check and the retry, so the additional * except for errors from the smart contract, or duplicate transaction
* transaction to get the status is unlikely to be worthwhile * errors
*
* You might decide to retry transactions which fail with specific errors
* instead, for example:
* MVCC_READ_CONFLICT
* PHANTOM_READ_CONFLICT
* ENDORSEMENT_POLICY_FAILURE
* CHAINCODE_VERSION_CONFLICT
* EXPIRED_CHAINCODE
*/ */
export const startRetryLoop = ( export const processSubmitTransactionJob = async (
contracts: Map<string, Contract>, contracts: Map<string, Contract>,
redis: Redis job: Job<JobData, JobResult>
): void => { ): Promise<JobResult> => {
const retryInterval = setInterval( logger.debug({ jobId: job.id, jobName: job.name }, 'Processing job');
async (contracts, redis) => {
if (logger.isLevelEnabled('debug')) { const contract = contracts.get(job.data.mspid);
try { if (contract === undefined) {
const pendingTransactionCount = await (redis as Redis).zcard( logger.error(
'index:txn:timestamp' { jobId: job.id, jobName: job.name },
); 'Contract not found for MSP ID %s',
logger.debug( job.data.mspid
'%d transactions awaiting retry', );
pendingTransactionCount
); // Retrying will not work, so give up with an unsuccessful result
} catch (err) { return {
logger.warn({ err }, 'Error getting pending transaction count'); transactionError: undefined,
} transactionPayload: undefined,
};
}
let transaction: Transaction;
if (job.data.transactionState) {
const savedState = job.data.transactionState;
logger.debug(
{
jobId: job.id,
jobName: job.name,
savedState,
},
'Using previously saved transaction state'
);
transaction = contract.deserializeTransaction(savedState);
} else {
logger.debug(
{
jobId: job.id,
jobName: job.name,
},
'Using new transaction'
);
transaction = contract.createTransaction(job.data.transactionName);
await updateJobData(job, transaction);
}
try {
logger.debug(
{
jobId: job.id,
jobName: job.name,
transactionId: transaction.getTransactionId(),
},
'Submitting transaction'
);
const args = job.data.transactionArgs;
const payload = await submitTransaction(transaction, ...args);
return {
transactionError: undefined,
transactionPayload: payload,
};
} catch (err) {
if (
err instanceof Error &&
(isContractError(err) || isDuplicateTransactionError(err))
) {
logger.error(
{ jobId: job.id, jobName: job.name, err },
'Fatal transaction error occurred'
);
// Return a job result to stop retrying
return {
transactionError: err.toString(),
transactionPayload: undefined,
};
} else {
logger.warn(
{ jobId: job.id, jobName: job.name, err },
'Retryable transaction error occurred'
);
// The original transaction may eventually get committed in the case of
// a timeout error, so keep the same transaction ID to protect against
// unintended duplicate transactions
if (!(err instanceof TimeoutError)) {
logger.debug(
{ jobId: job.id, jobName: job.name },
'Clearing saved transaction state'
);
await updateJobData(job, undefined);
} }
const savedTransaction = await getRetryTransactionDetails(redis); // Rethrow the error to keep retrying
throw err;
if (savedTransaction) { }
const contract = contracts.get(savedTransaction.mspId); }
if (contract) {
await retryTransaction(contract, redis, savedTransaction);
} else {
clearTransactionDetails(redis, savedTransaction.transactionId);
logger.error(
'No contract found for %s to retry transaction %s',
savedTransaction.mspId,
savedTransaction.transactionId
);
}
}
},
config.retryDelay,
contracts,
redis
);
retryInterval.unref();
}; };
/* /*
@ -183,146 +244,64 @@ export const evatuateTransaction = async (
transactionName: string, transactionName: string,
...transactionArgs: string[] ...transactionArgs: string[]
): Promise<Buffer> => { ): Promise<Buffer> => {
const txn = contract.createTransaction(transactionName); const transaction = contract.createTransaction(transactionName);
const txnId = txn.getTransactionId(); const transactionId = transaction.getTransactionId();
logger.trace({ transaction }, 'Evaluating transaction');
try { try {
const payload = await txn.evaluate(...transactionArgs); const payload = await transaction.evaluate(...transactionArgs);
logger.debug( logger.trace(
{ transactionId: txnId, payload: payload.toString() }, { transactionId: transactionId, payload: payload.toString() },
'Evaluate transaction response received' 'Evaluate transaction response received'
); );
return payload; return payload;
} catch (err) {
throw handleError(transactionId, err);
}
};
/*
* Submit a transaction and handle any errors
*/
export const submitTransaction = async (
transaction: Transaction,
...transactionArgs: string[]
): Promise<Buffer> => {
logger.trace({ transaction }, 'Submitting transaction');
const txnId = transaction.getTransactionId();
try {
const payload = await transaction.submit(...transactionArgs);
logger.trace(
{ transactionId: txnId, payload: payload.toString() },
'Submit transaction response received'
);
return payload;
} catch (err) { } catch (err) {
throw handleError(txnId, err); throw handleError(txnId, err);
} }
}; };
/* /*
* Submit a transaction and handle any errors * Get the validation code of the specified transaction
*
* Transaction details are saved before being submitted so that they can be
* retried if any errors occur
*/ */
export const submitTransaction = async ( export const getTransactionValidationCode = async (
contract: Contract, qsccContract: Contract,
redis: Redis, transactionId: string
mspId: string,
transactionName: string,
...transactionArgs: string[]
): Promise<string> => { ): Promise<string> => {
const txn = contract.createTransaction(transactionName); const data = await evatuateTransaction(
const txnId = txn.getTransactionId(); qsccContract,
const txnState = txn.serialize(); 'GetTransactionByID',
const txnArgs = JSON.stringify(transactionArgs); config.channelName,
const timestamp = Date.now(); transactionId
);
try { const processedTransaction = protos.protos.ProcessedTransaction.decode(data);
// Store the transaction details and set the event handler in case there const validationCode =
// are problems later with commiting the transaction protos.protos.TxValidationCode[processedTransaction.validationCode];
await storeTransactionDetails(
redis,
txnId,
mspId,
txnState,
txnArgs,
timestamp
);
txn.setEventHandler(DefaultEventHandlerStrategies.NONE);
await txn.submit(...transactionArgs);
} catch (err) {
// If the transaction failed to endorse, there is no point attempting
// to retry it later so clear the transaction details
// TODO will this always catch endorsement errors or can they
// arrive later?
await clearTransactionDetails(redis, txnId);
throw handleError(txnId, err);
}
return txnId; logger.debug({ transactionId }, 'Validation code: %s', validationCode);
}; return validationCode;
/*
* Retry a transaction
*
* The saved transaction details include a retry count which is used to ensure
* failing transactions are not retried indefinitely
*/
const retryTransaction = async (
contract: Contract,
redis: Redis,
savedTransaction: TransactionDetails
): Promise<void> => {
logger.debug('Retrying transaction %s', savedTransaction.transactionId);
try {
const transaction = contract.deserializeTransaction(
savedTransaction.transactionState
);
const args: string[] = JSON.parse(savedTransaction.transactionArgs);
const payload = await transaction.submit(...args);
logger.debug(
{
transactionId: savedTransaction.transactionId,
payload: payload.toString(),
},
'Retry transaction response received'
);
await clearTransactionDetails(redis, savedTransaction.transactionId);
} catch (err) {
if (isDuplicateTransactionError(err)) {
logger.warn(
'Transaction %s has already been committed',
savedTransaction.transactionId
);
await clearTransactionDetails(redis, savedTransaction.transactionId);
} else {
logger.warn(
err,
'Retry %d failed for transaction %s',
savedTransaction.retries,
savedTransaction.transactionId
);
if (savedTransaction.retries < config.maxRetryCount) {
await incrementRetryCount(redis, savedTransaction.transactionId);
} else {
await clearTransactionDetails(redis, savedTransaction.transactionId);
}
}
}
};
/*
* Block event listener to handle successful transactions
*
* Transaction details are saved before being submitted so that they can be
* retried, and this listener deletes those transaction details for any
* successful transactions
*
* Transactions can be submitted using one of two identities however one one
* of those identities is used to listen for block events
*/
export const blockEventHandler = (redis: Redis): BlockListener => {
const blockListener = async (event: BlockEvent) => {
logger.debug(
{ blockNumber: event.blockNumber.toString() },
'Block event received'
);
const transactionEvents: Array<TransactionEvent> =
event.getTransactionEvents();
for (const event of transactionEvents) {
if (event && event.isValid) {
logger.debug('Remove transation with txnId %s', event.transactionId);
await clearTransactionDetails(redis, event.transactionId);
}
}
};
return blockListener;
}; };
/* /*
@ -340,6 +319,7 @@ export const getBlockHeight = async (
); );
const info = protos.common.BlockchainInfo.decode(data); const info = protos.common.BlockchainInfo.decode(data);
const blockHeight = info.height; const blockHeight = info.height;
logger.debug('Current block height: %d', blockHeight); logger.debug('Current block height: %d', blockHeight);
return blockHeight; return blockHeight;
}; };

View file

@ -8,6 +8,8 @@ import { getReasonPhrase, StatusCodes } from 'http-status-codes';
import { getBlockHeight } from './fabric'; import { getBlockHeight } from './fabric';
import { logger } from './logger'; import { logger } from './logger';
import * as config from './config'; import * as config from './config';
import { Queue } from 'bullmq';
import { getJobCounts } from './jobs';
const { SERVICE_UNAVAILABLE, OK } = StatusCodes; const { SERVICE_UNAVAILABLE, OK } = StatusCodes;
@ -27,21 +29,26 @@ healthRouter.get('/ready', (_req, res: Response) =>
healthRouter.get('/live', async (req: Request, res: Response) => { healthRouter.get('/live', async (req: Request, res: Response) => {
logger.debug(req.body, 'Liveness request received'); logger.debug(req.body, 'Liveness request received');
const qsccOrg1 = req.app.get(config.mspIdOrg1).qsccContract as Contract;
const qsccOrg2 = req.app.get(config.mspIdOrg2).qsccContract as Contract;
try { try {
await Promise.all([getBlockHeight(qsccOrg1), getBlockHeight(qsccOrg2)]); const submitQueue = req.app.get('jobq') as Queue;
} catch (err) { const qsccOrg1 = req.app.get(config.mspIdOrg1).qsccContract as Contract;
logger.error(err, 'Error processing liveness request'); const qsccOrg2 = req.app.get(config.mspIdOrg2).qsccContract as Contract;
res.status(SERVICE_UNAVAILABLE).json({ await Promise.all([
getBlockHeight(qsccOrg1),
getBlockHeight(qsccOrg2),
getJobCounts(submitQueue),
]);
} catch (err) {
logger.error({ err }, 'Error processing liveness request');
return res.status(SERVICE_UNAVAILABLE).json({
status: getReasonPhrase(SERVICE_UNAVAILABLE), status: getReasonPhrase(SERVICE_UNAVAILABLE),
timestamp: new Date().toISOString(), timestamp: new Date().toISOString(),
}); });
} }
res.status(OK).json({ return res.status(OK).json({
status: getReasonPhrase(OK), status: getReasonPhrase(OK),
timestamp: new Date().toISOString(), timestamp: new Date().toISOString(),
}); });

View file

@ -2,19 +2,93 @@
* SPDX-License-Identifier: Apache-2.0 * SPDX-License-Identifier: Apache-2.0
*/ */
import { Contract } from 'fabric-network';
import * as config from './config'; import * as config from './config';
import {
createGateway,
createWallet,
getContracts,
getNetwork,
} from './fabric';
import {
initJobQueue,
initJobQueueScheduler,
initJobQueueWorker,
} from './jobs';
import { logger } from './logger'; import { logger } from './logger';
import { createServer } from './server'; import { createServer } from './server';
import { isMaxmemoryPolicyNoeviction } from './redis';
import { Queue, QueueScheduler, Worker } from 'bullmq';
let jobQueue: Queue | undefined;
let jobQueueWorker: Worker | undefined;
let jobQueueScheduler: QueueScheduler | undefined;
async function main() { async function main() {
logger.info('Checking Redis config');
if (!(await isMaxmemoryPolicyNoeviction())) {
throw new Error(
'Invalid redis configuration: redis instance must have the setting maxmemory-policy=noeviction'
);
}
logger.info('Connecting to Fabric network');
const wallet = await createWallet();
const gatewayOrg1 = await createGateway(
config.connectionProfileOrg1,
config.mspIdOrg1,
wallet
);
const networkOrg1 = await getNetwork(gatewayOrg1);
const contractsOrg1 = await getContracts(networkOrg1);
const gatewayOrg2 = await createGateway(
config.connectionProfileOrg2,
config.mspIdOrg2,
wallet
);
const networkOrg2 = await getNetwork(gatewayOrg2);
const contractsOrg2 = await getContracts(networkOrg2);
const assetContracts = new Map<string, Contract>();
assetContracts.set(config.mspIdOrg1, contractsOrg1.assetContract);
assetContracts.set(config.mspIdOrg2, contractsOrg2.assetContract);
logger.info('Initialising submit job queue');
jobQueue = initJobQueue();
jobQueueWorker = initJobQueueWorker(assetContracts);
if (config.submitJobQueueScheduler === true) {
logger.info('Initialising submit job queue scheduler');
jobQueueScheduler = initJobQueueScheduler();
}
logger.info('Creating REST server');
const app = await createServer(); const app = await createServer();
app.set(config.mspIdOrg1, contractsOrg1);
app.set(config.mspIdOrg2, contractsOrg2);
app.set('jobq', jobQueue);
app.listen(config.port, () => { app.listen(config.port, () => {
logger.info('Express server started on port: %d', config.port); logger.info('REST server started on port: %d', config.port);
}); });
} }
// TODO handle errors! E.g. try starting with the wrong cert and private key! main().catch(async (err) => {
main().catch((err) => { logger.error({ err }, 'Unxepected error');
logger.error(err, 'Unxepected error');
if (jobQueueScheduler != undefined) {
logger.debug('Closing job queue scheduler');
await jobQueueScheduler.close();
}
if (jobQueueWorker != undefined) {
logger.debug('Closing job queue worker');
await jobQueueWorker.close();
}
if (jobQueue != undefined) {
logger.debug('Closing job queue');
await jobQueue.close();
}
}); });

View file

@ -0,0 +1,41 @@
/*
* SPDX-License-Identifier: Apache-2.0
*/
import { Queue } from 'bullmq';
import express, { Request, Response } from 'express';
import { getReasonPhrase, StatusCodes } from 'http-status-codes';
import { JobNotFoundError } from './errors';
import { getJobSummary } from './jobs';
import { logger } from './logger';
const { INTERNAL_SERVER_ERROR, NOT_FOUND, OK } = StatusCodes;
export const jobsRouter = express.Router();
jobsRouter.get('/:jobId', async (req: Request, res: Response) => {
const jobId = req.params.jobId;
logger.debug('Read request received for job ID %s', jobId);
try {
const submitQueue = req.app.get('jobq') as Queue;
const jobSummary = await getJobSummary(submitQueue, jobId);
return res.status(OK).json(jobSummary);
} catch (err) {
logger.error({ err }, 'Error processing read request for job ID %s', jobId);
if (err instanceof JobNotFoundError) {
return res.status(NOT_FOUND).json({
status: getReasonPhrase(NOT_FOUND),
timestamp: new Date().toISOString(),
});
}
return res.status(INTERNAL_SERVER_ERROR).json({
status: getReasonPhrase(INTERNAL_SERVER_ERROR),
timestamp: new Date().toISOString(),
});
}
});

View file

@ -0,0 +1,155 @@
/*
* SPDX-License-Identifier: Apache-2.0
*/
import { Job, Queue } from 'bullmq';
import { getJobCounts, getJobSummary } from './jobs';
import { mock, MockProxy } from 'jest-mock-extended';
import { JobNotFoundError } from './errors';
describe('initJobQueue', () => {
it.todo('write tests');
});
describe('initJobQueueWorker', () => {
it.todo('write tests');
});
describe('initJobQueueScheduler', () => {
it.todo('write tests');
});
describe('addSubmitTransactionJob', () => {
it.todo('write tests');
});
describe('getJobSummary', () => {
let mockQueue: MockProxy<Queue>;
let mockJob: MockProxy<Job>;
beforeEach(() => {
mockQueue = mock<Queue>();
mockJob = mock<Job>();
});
it('throws a JobNotFoundError if the Job is undefined', async () => {
mockQueue.getJob.calledWith('1').mockResolvedValue(undefined);
await expect(async () => {
await getJobSummary(mockQueue, '1');
}).rejects.toThrow(JobNotFoundError);
});
it('gets a job summary with transaction payload data', async () => {
mockQueue.getJob.calledWith('1').mockResolvedValue(mockJob);
mockJob.id = '1';
mockJob.data = {
transactionIds: ['txn1'],
};
mockJob.returnvalue = {
transactionPayload: Buffer.from('MOCK PAYLOAD'),
};
expect(await getJobSummary(mockQueue, '1')).toStrictEqual({
jobId: '1',
transactionIds: ['txn1'],
transactionError: undefined,
transactionPayload: 'MOCK PAYLOAD',
});
});
it('gets a job summary with empty transaction payload data', async () => {
mockQueue.getJob.calledWith('1').mockResolvedValue(mockJob);
mockJob.id = '1';
mockJob.data = {
transactionIds: ['txn1'],
};
mockJob.returnvalue = {
transactionPayload: Buffer.from(''),
};
expect(await getJobSummary(mockQueue, '1')).toStrictEqual({
jobId: '1',
transactionIds: ['txn1'],
transactionError: undefined,
transactionPayload: '',
});
});
it('gets a job summary with a transaction error', async () => {
mockQueue.getJob.calledWith('1').mockResolvedValue(mockJob);
mockJob.id = '1';
mockJob.data = {
transactionIds: ['txn1'],
};
mockJob.returnvalue = {
transactionError: 'MOCK ERROR',
};
expect(await getJobSummary(mockQueue, '1')).toStrictEqual({
jobId: '1',
transactionIds: ['txn1'],
transactionError: 'MOCK ERROR',
transactionPayload: '',
});
});
it('gets a job summary when there is no return value', async () => {
mockQueue.getJob.calledWith('1').mockResolvedValue(mockJob);
mockJob.id = '1';
mockJob.returnvalue = undefined;
mockJob.data = {
transactionIds: ['txn1'],
};
expect(await getJobSummary(mockQueue, '1')).toStrictEqual({
jobId: '1',
transactionIds: ['txn1'],
transactionError: undefined,
transactionPayload: undefined,
});
});
it('gets a job summary when there is no job data', async () => {
mockQueue.getJob.calledWith('1').mockResolvedValue(mockJob);
mockJob.id = '1';
mockJob.data = undefined;
mockJob.returnvalue = {
transactionPayload: Buffer.from('MOCK PAYLOAD'),
};
expect(await getJobSummary(mockQueue, '1')).toStrictEqual({
jobId: '1',
transactionIds: [],
transactionError: undefined,
transactionPayload: 'MOCK PAYLOAD',
});
});
});
describe('updateSubmitTransactionJobStateData', () => {
it.todo('write tests');
});
describe('getJobCounts', () => {
it('gets job counts from the specified queue', async () => {
const mockQueue = mock<Queue>();
mockQueue.getJobCounts
.calledWith('active', 'completed', 'delayed', 'failed', 'waiting')
.mockResolvedValue({
active: 1,
completed: 2,
delayed: 3,
failed: 4,
waiting: 5,
});
expect(await getJobCounts(mockQueue)).toStrictEqual({
active: 1,
completed: 2,
delayed: 3,
failed: 4,
waiting: 5,
});
});
});

View file

@ -0,0 +1,216 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* This sample uses BullMQ jobs to process submit transactions, which includes
* retry support for failing jobs
*
* Important: BullMQ requires the following setting in redis
* maxmemory-policy=noeviction
* For details, see: https://docs.bullmq.io/guide/connections
*/
import { ConnectionOptions, Job, Queue, QueueScheduler, Worker } from 'bullmq';
import { Contract, Transaction } from 'fabric-network';
import * as config from './config';
import { JobNotFoundError } from './errors';
import { processSubmitTransactionJob } from './fabric';
import { logger } from './logger';
export type JobData = {
mspid: string;
transactionName: string;
transactionArgs: string[];
transactionState?: Buffer;
transactionIds: string[];
};
export type JobResult = {
transactionPayload?: Buffer;
transactionError?: string;
};
// TODO include attempts made?
export type JobSummary = {
jobId: string;
transactionIds: string[];
transactionPayload?: string;
transactionError?: string;
};
const connection: ConnectionOptions = {
port: config.redisPort,
host: config.redisHost,
username: config.redisUsername,
password: config.redisPassword,
};
export const initJobQueue = (): Queue => {
const submitQueue = new Queue(config.JOB_QUEUE_NAME, {
connection,
defaultJobOptions: {
attempts: config.submitJobAttempts,
backoff: {
type: config.submitJobBackoffType,
delay: config.submitJobBackoffDelay,
},
removeOnComplete: config.maxCompletedSubmitJobs,
removeOnFail: config.maxFailedSubmitJobs,
},
});
return submitQueue;
};
export const initJobQueueWorker = (
contracts: Map<string, Contract>
): Worker => {
const worker = new Worker<JobData, JobResult>(
config.JOB_QUEUE_NAME,
async (job): Promise<JobResult> => {
return await processSubmitTransactionJob(contracts, job);
},
{ connection, concurrency: config.submitJobConcurrency }
);
worker.on('failed', (job) => {
logger.error({ job }, 'Job failed'); // WHY?!
});
// Important: need to handle this error otherwise worker may stop
// processing jobs
worker.on('error', (err) => {
logger.error({ err }, 'Worker error');
});
if (logger.isLevelEnabled('debug')) {
worker.on('completed', (job) => {
logger.debug({ job }, 'Job completed');
});
}
return worker;
};
export const initJobQueueScheduler = (): QueueScheduler => {
const queueScheduler = new QueueScheduler(config.JOB_QUEUE_NAME, {
connection,
});
queueScheduler.on('failed', (jobId, failedReason) => {
// TODO when does this happen, and how should it be handled?
logger.error({ jobId, failedReason }, 'Queue sceduler failure');
});
return queueScheduler;
};
export const addSubmitTransactionJob = async (
submitQueue: Queue<JobData, JobResult>,
mspid: string,
transactionName: string,
...transactionArgs: string[]
): Promise<string> => {
const jobName = `submit ${transactionName} transaction`;
const job = await submitQueue.add(jobName, {
mspid,
transactionName,
transactionArgs: transactionArgs,
transactionIds: [],
});
if (job?.id === undefined) {
throw new Error('Submit transaction job ID not available');
}
return job.id;
};
/*
* Gets a summary for the jobs endpoint
*/
export const getJobSummary = async (
queue: Queue,
jobId: string
): Promise<JobSummary> => {
const job: Job<JobData, JobResult> | undefined = await queue.getJob(jobId);
logger.debug({ job }, 'Got job');
if (!(job && job.id != undefined)) {
throw new JobNotFoundError(`Job ${jobId} not found`, jobId);
}
let transactionIds: string[];
if (job.data && job.data.transactionIds) {
transactionIds = job.data.transactionIds;
} else {
transactionIds = [];
}
let transactionError;
let transactionPayload;
const returnValue = job.returnvalue;
if (returnValue) {
if (returnValue.transactionError) {
transactionError = returnValue.transactionError;
}
if (
returnValue.transactionPayload &&
returnValue.transactionPayload.length > 0
) {
transactionPayload = returnValue.transactionPayload.toString();
} else {
transactionPayload = '';
}
}
const jobSummary: JobSummary = {
jobId: job.id,
transactionIds,
transactionError,
transactionPayload,
};
return jobSummary;
};
export const updateJobData = async (
job: Job<JobData, JobResult>,
transaction: Transaction | undefined
): Promise<void> => {
const newData = { ...job.data };
if (transaction != undefined) {
const transationIds = ([] as string[]).concat(
newData.transactionIds,
transaction.getTransactionId()
);
newData.transactionIds = transationIds;
newData.transactionState = transaction.serialize();
} else {
newData.transactionState = undefined;
}
await job.update(newData);
};
/*
* Get the current job counts
*
* This function is used for the liveness REST endpoint
*/
export const getJobCounts = async (
queue: Queue
): Promise<{ [index: string]: number }> => {
const jobCounts = await queue.getJobCounts(
'active',
'completed',
'delayed',
'failed',
'waiting'
);
logger.debug({ jobCounts }, 'Current job counts');
return jobCounts;
};

View file

@ -2,183 +2,33 @@
* SPDX-License-Identifier: Apache-2.0 * SPDX-License-Identifier: Apache-2.0
*/ */
import * as config from './config'; import { isMaxmemoryPolicyNoeviction } from './redis';
import IORedis, { Redis } from 'ioredis';
import {
clearTransactionDetails,
incrementRetryCount,
storeTransactionDetails,
getTransactionDetails,
getRetryTransactionDetails,
} from './redis';
jest.mock('ioredis', () => require('ioredis-mock/jest')); const mockRedisConfig = jest.fn();
jest.mock('ioredis', () => {
return jest.fn().mockImplementation(() => {
return {
config: mockRedisConfig,
disconnect: jest.fn(),
};
});
});
jest.mock('./config'); jest.mock('./config');
describe('Redis', () => { describe('Redis', () => {
let redis: Redis; beforeEach(() => {
mockRedisConfig.mockClear();
const mockTransactionId =
'0ae62c01e4c4b112c3f3954a2f11243da76778e46df9ad2783bcbafc79652b95';
const mockKey = `txn:${mockTransactionId}`;
const mockMspId = 'Org1MSP';
const mockState = Buffer.from(
`{"name":"CreateAsset","nonce":"damqinq8nrI4n4qY8lFVsZw7RwG2ufrv","transactionId":${mockTransactionId}`
);
const mockArgs = '["test111","red",400,"Jean",101]';
const mockTimestamp = 1628078044362;
const addMockTransationDetails = async (redis: Redis) => {
await redis
.multi()
.hset(
mockKey,
'mspId',
mockMspId,
'state',
mockState,
'args',
mockArgs,
'timestamp',
mockTimestamp,
'retries',
'0'
)
.zadd('index:txn:timestamp', mockTimestamp, mockTransactionId)
.exec();
};
beforeEach(async () => {
const redisOptions = {
port: config.redisPort,
host: config.redisHost,
username: config.redisUsername,
password: config.redisPassword,
};
redis = new IORedis(redisOptions) as unknown as Redis;
});
describe('storeTransactionDetails', () => {
it('stores transaction details as a hash', async () => {
await storeTransactionDetails(
redis,
mockTransactionId,
mockMspId,
mockState,
mockArgs,
mockTimestamp
);
const storedTransaction = await redis.hgetall(mockKey);
const expectedTransaction = {
mspId: mockMspId,
state: mockState,
args: mockArgs,
retries: '0',
timestamp: mockTimestamp.toString(),
};
expect(storedTransaction).toStrictEqual(expectedTransaction);
});
it('adds the transaction ID to the sorted set timestamp index', async () => {
await storeTransactionDetails(
redis,
mockTransactionId,
mockMspId,
mockState,
mockArgs,
mockTimestamp
);
const index = await redis.zrange('index:txn:timestamp', 0, -1);
expect(index).toStrictEqual([mockTransactionId]);
});
// TODO this seems to work for spying/mocking...
// jest.spyOn(redis, 'multi').mock...
// but haven't worked out how to spy on the hset, zadd, exec in that chain
// Ask Mark?
it.todo('handles an error from redis');
}); });
describe('getTransactionDetails', () => { describe('isMaxmemoryPolicyNoeviction', () => {
it('gets the transaction details from a hash', async () => { it('returns true when the maxmemory-policy is noeviction', async () => {
await addMockTransationDetails(redis); mockRedisConfig.mockReturnValue(['maxmemory-policy', 'noeviction']);
expect(await isMaxmemoryPolicyNoeviction()).toBe(true);
const details = await getTransactionDetails(redis, mockTransactionId);
expect(details).toStrictEqual({
transactionId: mockTransactionId,
mspId: mockMspId,
transactionState: mockState,
transactionArgs: mockArgs,
retries: 0,
timestamp: mockTimestamp,
});
}); });
it.todo('handles an error from redis'); it('returns false when the maxmemory-policy is not noeviction', async () => {
}); mockRedisConfig.mockReturnValue(['maxmemory-policy', 'allkeys-lru']);
expect(await isMaxmemoryPolicyNoeviction()).toBe(false);
describe('getRetryTransactionDetails', () => {
it('gets the oldest transaction details from a hash', async () => {
await addMockTransationDetails(redis);
const details = await getRetryTransactionDetails(redis);
expect(details).toStrictEqual({
transactionId: mockTransactionId,
mspId: mockMspId,
transactionState: mockState,
transactionArgs: mockArgs,
retries: 0,
timestamp: mockTimestamp,
});
}); });
it('gets undefined if there are no transactions to retry', async () => {
const details = await getRetryTransactionDetails(redis);
expect(details).toBeUndefined();
});
it.todo('handles an error from redis');
});
describe('clearTransactionDetails', () => {
it('removes the transaction details hash', async () => {
await addMockTransationDetails(redis);
await clearTransactionDetails(redis, mockTransactionId);
const storedTransaction = await redis.hgetall(mockKey);
expect(storedTransaction).not.toHaveProperty('state');
});
it('removes the transaction ID from the sorted set timestamp index', async () => {
await addMockTransationDetails(redis);
await clearTransactionDetails(redis, mockTransactionId);
const index = await redis.zrange('index:txn:timestamp', 0, -1);
expect(index).toStrictEqual([]);
});
});
describe('incrementRetryCount', () => {
it('increments the retries value in the transction details hash', async () => {
await addMockTransationDetails(redis);
await incrementRetryCount(redis, mockTransactionId);
const retries = await redis.hget(mockKey, 'retries');
expect(retries).toBe('1');
});
it.todo(
'updates the position of the transaction ID in the sorted set timestamp index'
);
it.todo('handles an error from redis');
}); });
}); });

View file

@ -1,12 +1,7 @@
/* /*
* SPDX-License-Identifier: Apache-2.0 * SPDX-License-Identifier: Apache-2.0
* *
* This sample includes basic retry logic so it needs somewhere to store * TBC
* transaction details in case the app restarts for any reason, and Redis is
* just one of the options available
*
* Note: This implementation is not designed with multiple instances of the
* REST app in mind, which is likely to be required in a production environment
*/ */
import IORedis, { Redis, RedisOptions } from 'ioredis'; import IORedis, { Redis, RedisOptions } from 'ioredis';
@ -14,184 +9,43 @@ import IORedis, { Redis, RedisOptions } from 'ioredis';
import * as config from './config'; import * as config from './config';
import { logger } from './logger'; import { logger } from './logger';
const redisOptions: RedisOptions = {
port: config.redisPort,
host: config.redisHost,
username: config.redisUsername,
password: config.redisPassword,
};
export const redis = new IORedis(redisOptions);
export type TransactionDetails = {
transactionId: string;
mspId: string;
transactionState: Buffer;
transactionArgs: string;
timestamp: number;
retries: number;
};
/* /*
* Store enough information in order to resubmit a transaction * Check whether the maxmemory-policy config is set to noeviction
*
* BullMQ requires this setting in redis
* For details, see: https://docs.bullmq.io/guide/connections
*/ */
export const storeTransactionDetails = async ( export const isMaxmemoryPolicyNoeviction = async (): Promise<boolean> => {
redis: Redis, let redis: Redis | undefined;
transactionId: string,
mspId: string, const redisOptions: RedisOptions = {
transactionState: Buffer, port: config.redisPort,
transactionArgs: string, host: config.redisHost,
timestamp: number username: config.redisUsername,
): Promise<void> => { password: config.redisPassword,
};
try { try {
const key = `txn:${transactionId}`; redis = new IORedis(redisOptions);
logger.debug(
{
key,
mspId,
transactionState,
transactionArgs,
timestamp,
},
'Storing transaction details'
);
await redis
.multi()
.hset(
key,
'mspId',
mspId,
'state',
transactionState,
'args',
transactionArgs,
'timestamp',
timestamp,
'retries',
'0'
)
.zadd('index:txn:timestamp', timestamp, transactionId)
.exec();
} catch (err) {
// TODO just log?!
logger.error(
{ err },
'Error storing details for transaction ID %s',
transactionId
);
}
};
/* const maxmemoryPolicyConfig = await (redis as Redis).config(
* Get the information required to resubmit a transaction 'GET',
*/ 'maxmemory-policy'
export const getTransactionDetails = async (
redis: Redis,
transactionId: string
): Promise<TransactionDetails | undefined> => {
try {
const savedTransaction = await (redis as Redis).hgetall(
`txn:${transactionId}`
);
logger.debug(
{ transactionId: transactionId, state: savedTransaction },
'Got transaction details'
); );
logger.debug({ maxmemoryPolicyConfig }, 'Got maxmemory-policy config');
const transactionDetails = { if (
transactionId: transactionId, maxmemoryPolicyConfig.length == 2 &&
mspId: savedTransaction.mspId, 'maxmemory-policy' === maxmemoryPolicyConfig[0] &&
transactionState: Buffer.from(savedTransaction.state), 'noeviction' === maxmemoryPolicyConfig[1]
transactionArgs: savedTransaction.args, ) {
timestamp: parseInt(savedTransaction.timestamp), return true;
retries: parseInt(savedTransaction.retries), }
}; } finally {
return transactionDetails; if (redis != undefined) {
} catch (err) { redis.disconnect();
// TODO just log?!
logger.error(
{ err },
'Error getting details for transaction ID %s',
transactionId
);
}
};
/*
* Get the oldest transaction details
*/
export const getRetryTransactionDetails = async (
redis: Redis
): Promise<TransactionDetails | undefined> => {
try {
const transactionIds = await (redis as Redis).zrange(
'index:txn:timestamp',
-1,
-1
);
if (transactionIds.length > 0) {
const transactionId = transactionIds[0];
const savedTransaction = await getTransactionDetails(
redis,
transactionId
);
return savedTransaction;
} }
} catch (err) {
// TODO just log?!
logger.error(
{ err },
'Error getting details for next transaction to retry'
);
} }
};
/* return false;
* Delete transaction details
*/
export const clearTransactionDetails = async (
redis: Redis,
transactionId: string
): Promise<void> => {
const key = `txn:${transactionId}`;
logger.debug('Removing transaction details. Key: %s', key);
try {
await redis
.multi()
.del(key)
.zrem('index:txn:timestamp', transactionId)
.exec();
} catch (err) {
// TODO just log?!
logger.error(
{ err },
'Error remove details for transaction ID %s',
transactionId
);
}
};
/*
* Increment the number of times the transaction has been retried
* TODO needs to update the timestamp and index as well
*/
export const incrementRetryCount = async (
redis: Redis,
transactionId: string
): Promise<void> => {
const key = `txn:${transactionId}`;
logger.debug('Incrementing retries fortransaction Key: %s', key);
try {
await (redis as Redis).hincrby(`txn:${transactionId}`, 'retries', 1);
} catch (err) {
// TODO just log?!
logger.error(
err,
'Error incrementing retries for transaction ID %s',
transactionId
);
}
}; };

View file

@ -2,30 +2,19 @@
* SPDX-License-Identifier: Apache-2.0 * SPDX-License-Identifier: Apache-2.0
*/ */
import helmet from 'helmet';
import { StatusCodes, getReasonPhrase } from 'http-status-codes';
import express, { Application, NextFunction, Request, Response } from 'express'; import express, { Application, NextFunction, Request, Response } from 'express';
import pinoMiddleware from 'pino-http'; import helmet from 'helmet';
import { Contract } from 'fabric-network'; import { getReasonPhrase, StatusCodes } from 'http-status-codes';
import { logger } from './logger';
import { assetsRouter } from './assets.router';
import { healthRouter } from './health.router';
import { transactionsRouter } from './transactions.router';
import {
getContracts,
getNetwork,
createGateway,
createWallet,
startRetryLoop,
blockEventHandler,
} from './fabric';
import { redis } from './redis';
import * as config from './config';
const { BAD_REQUEST, INTERNAL_SERVER_ERROR, NOT_FOUND } = StatusCodes;
import { authenticateApiKey, fabricAPIKeyStrategy } from './auth';
import passport from 'passport'; import passport from 'passport';
import pinoMiddleware from 'pino-http';
import { assetsRouter } from './assets.router';
import { authenticateApiKey, fabricAPIKeyStrategy } from './auth';
import { healthRouter } from './health.router';
import { jobsRouter } from './jobs.router';
import { logger } from './logger';
import { transactionsRouter } from './transactions.router';
const { BAD_REQUEST, INTERNAL_SERVER_ERROR, NOT_FOUND } = StatusCodes;
export const createServer = async (): Promise<Application> => { export const createServer = async (): Promise<Application> => {
const app = express(); const app = express();
@ -71,42 +60,9 @@ export const createServer = async (): Promise<Application> => {
app.use(helmet()); app.use(helmet());
} }
const wallet = await createWallet();
const gatewayOrg1 = await createGateway(
config.connectionProfileOrg1,
config.mspIdOrg1,
wallet
);
const networkOrg1 = await getNetwork(gatewayOrg1);
const contractsOrg1 = await getContracts(networkOrg1);
app.set(config.mspIdOrg1, contractsOrg1);
const gatewayOrg2 = await createGateway(
config.connectionProfileOrg2,
config.mspIdOrg2,
wallet
);
const networkOrg2 = await getNetwork(gatewayOrg2);
const contractsOrg2 = await getContracts(networkOrg2);
app.set(config.mspIdOrg2, contractsOrg2);
const assetContracts = new Map<string, Contract>();
assetContracts.set(config.mspIdOrg1, contractsOrg1.assetContract);
assetContracts.set(config.mspIdOrg2, contractsOrg2.assetContract);
startRetryLoop(assetContracts, redis);
app.set('redis', redis);
logger.debug('Adding block listener to %s network', config.blockListenerOrg);
if (config.blockListenerOrg === config.ORG1) {
await networkOrg1.addBlockListener(blockEventHandler(redis));
} else {
await networkOrg2.addBlockListener(blockEventHandler(redis));
}
app.use('/', healthRouter); app.use('/', healthRouter);
app.use('/api/assets', authenticateApiKey, assetsRouter); app.use('/api/assets', authenticateApiKey, assetsRouter);
app.use('/api/jobs', authenticateApiKey, jobsRouter);
app.use('/api/transactions', authenticateApiKey, transactionsRouter); app.use('/api/transactions', authenticateApiKey, transactionsRouter);
// For everything else // For everything else

View file

@ -4,81 +4,44 @@
import express, { Request, Response } from 'express'; import express, { Request, Response } from 'express';
import { Contract } from 'fabric-network'; import { Contract } from 'fabric-network';
import { protos } from 'fabric-protos';
import { getReasonPhrase, StatusCodes } from 'http-status-codes'; import { getReasonPhrase, StatusCodes } from 'http-status-codes';
import { Redis } from 'ioredis'; import { getTransactionValidationCode } from './fabric';
import { getTransactionDetails } from './redis';
import { evatuateTransaction } from './fabric';
import { logger } from './logger'; import { logger } from './logger';
import * as config from './config';
import { TransactionNotFoundError } from './errors'; import { TransactionNotFoundError } from './errors';
const { INTERNAL_SERVER_ERROR, NOT_FOUND, OK } = StatusCodes; const { INTERNAL_SERVER_ERROR, NOT_FOUND, OK } = StatusCodes;
export const transactionsRouter = express.Router(); export const transactionsRouter = express.Router();
type Progress = 'ACCEPTED' | 'RETRYING' | 'DONE';
transactionsRouter.get( transactionsRouter.get(
'/:transactionId', '/:transactionId',
async (req: Request, res: Response) => { async (req: Request, res: Response) => {
const mspId = req.user as string;
const transactionId = req.params.transactionId; const transactionId = req.params.transactionId;
logger.debug('Read request received for transaction ID %s', transactionId); logger.debug('Read request received for transaction ID %s', transactionId);
let foundTransaction = false;
let progress: Progress = 'DONE';
let validationCode = '';
const mspId = req.user as string;
const qscc = req.app.get(mspId).qsccContract as Contract;
const redis = req.app.get('redis') as Redis;
try { try {
const savedTransaction = await getTransactionDetails( const qsccContract = req.app.get(mspId).qsccContract as Contract;
redis,
const validationCode = await getTransactionValidationCode(
qsccContract,
transactionId transactionId
); );
if (savedTransaction?.transactionState) { return res.status(OK).json({
foundTransaction = true; transactionId,
if (savedTransaction.retries > 0) { validationCode,
progress = 'RETRYING';
} else {
progress = 'ACCEPTED';
}
}
} catch (err) {
logger.error(
err,
'Redis error processing read request for transaction ID %s',
transactionId
);
return res.status(INTERNAL_SERVER_ERROR).json({
status: getReasonPhrase(INTERNAL_SERVER_ERROR),
timestamp: new Date().toISOString(),
}); });
}
try {
const data = await evatuateTransaction(
qscc,
'GetTransactionByID',
config.channelName,
transactionId
);
foundTransaction = true;
// TODO is it possible to use the BlockDecoder decodeTransaction
// function in fabric-common?
const processedTransaction = protos.ProcessedTransaction.decode(data);
validationCode =
protos.TxValidationCode[processedTransaction.validationCode];
} catch (err) { } catch (err) {
if (!(err instanceof TransactionNotFoundError)) { if (err instanceof TransactionNotFoundError) {
return res.status(NOT_FOUND).json({
status: getReasonPhrase(NOT_FOUND),
timestamp: new Date().toISOString(),
});
} else {
logger.error( logger.error(
err, { err },
'Fabric error processing read request for transaction ID %s', 'Error processing read request for transaction ID %s',
transactionId transactionId
); );
@ -88,19 +51,5 @@ transactionsRouter.get(
}); });
} }
} }
if (foundTransaction) {
return res.status(OK).json({
status: getReasonPhrase(OK),
progress: progress,
validationCode: validationCode,
timestamp: new Date().toISOString(),
});
} else {
return res.status(NOT_FOUND).json({
status: getReasonPhrase(NOT_FOUND),
timestamp: new Date().toISOString(),
});
}
} }
); );