From 6a5e5ddc12437a03b29093fbbf59d45bedfe72f8 Mon Sep 17 00:00:00 2001 From: Stanislav Jakuschevskij Date: Wed, 5 Mar 2025 18:26:33 +0100 Subject: [PATCH] Add off-chain-data go client application (#1269) Signed-off-by: Stanislav Jakuschevskij --- .../chaincode-javascript/lib/assetTransfer.js | 4 +- .../application-gateway-go/connect.go | 2 +- .../application-gateway-go/connect.go | 6 +- ci/scripts/run-test-network-off-chain.sh | 13 +- off_chain_data/README.md | 21 +- off_chain_data/application-go/app.go | 66 ++++ off_chain_data/application-go/connect.go | 128 ++++++++ .../application-go/contract/contract.go | 76 +++++ .../application-go/contract/model.go | 9 + off_chain_data/application-go/getAllAssets.go | 34 ++ off_chain_data/application-go/go.mod | 20 ++ off_chain_data/application-go/go.sum | 34 ++ off_chain_data/application-go/listen.go | 310 ++++++++++++++++++ off_chain_data/application-go/parser/block.go | 98 ++++++ .../application-go/parser/block_test.go | 144 ++++++++ .../parser/endorserTransaction.go | 101 ++++++ .../parser/namespaceReadWriteSet.go | 37 +++ .../application-go/parser/payload.go | 52 +++ .../application-go/parser/readWriteSet.go | 22 ++ .../application-go/parser/transaction.go | 39 +++ off_chain_data/application-go/store.go | 80 +++++ off_chain_data/application-go/transact.go | 133 ++++++++ off_chain_data/application-go/utils.go | 13 + .../application-typescript/src/utils.ts | 2 +- 24 files changed, 1434 insertions(+), 10 deletions(-) create mode 100644 off_chain_data/application-go/app.go create mode 100644 off_chain_data/application-go/connect.go create mode 100644 off_chain_data/application-go/contract/contract.go create mode 100644 off_chain_data/application-go/contract/model.go create mode 100644 off_chain_data/application-go/getAllAssets.go create mode 100644 off_chain_data/application-go/go.mod create mode 100644 off_chain_data/application-go/go.sum create mode 100644 off_chain_data/application-go/listen.go create mode 100644 off_chain_data/application-go/parser/block.go create mode 100644 off_chain_data/application-go/parser/block_test.go create mode 100644 off_chain_data/application-go/parser/endorserTransaction.go create mode 100644 off_chain_data/application-go/parser/namespaceReadWriteSet.go create mode 100644 off_chain_data/application-go/parser/payload.go create mode 100644 off_chain_data/application-go/parser/readWriteSet.go create mode 100644 off_chain_data/application-go/parser/transaction.go create mode 100644 off_chain_data/application-go/store.go create mode 100644 off_chain_data/application-go/transact.go create mode 100644 off_chain_data/application-go/utils.go diff --git a/asset-transfer-basic/chaincode-javascript/lib/assetTransfer.js b/asset-transfer-basic/chaincode-javascript/lib/assetTransfer.js index a68b75c6..d50554fc 100644 --- a/asset-transfer-basic/chaincode-javascript/lib/assetTransfer.js +++ b/asset-transfer-basic/chaincode-javascript/lib/assetTransfer.js @@ -79,9 +79,9 @@ class AssetTransfer extends Contract { const asset = { ID: id, Color: color, - Size: size, + Size: Number(size), Owner: owner, - AppraisedValue: appraisedValue, + AppraisedValue: Number(appraisedValue), }; // we insert data in alphabetic order using 'json-stringify-deterministic' and 'sort-keys-recursive' await ctx.stub.putState(id, Buffer.from(stringify(sortKeysRecursive(asset)))); diff --git a/asset-transfer-events/application-gateway-go/connect.go b/asset-transfer-events/application-gateway-go/connect.go index 7b21d8b1..bc707109 100755 --- a/asset-transfer-events/application-gateway-go/connect.go +++ b/asset-transfer-events/application-gateway-go/connect.go @@ -31,7 +31,7 @@ const ( func newGrpcConnection() *grpc.ClientConn { certificatePEM, err := os.ReadFile(tlsCertPath) if err != nil { - panic(fmt.Errorf("failed to read TLS certifcate file: %w", err)) + panic(fmt.Errorf("failed to read TLS certificate file: %w", err)) } certificate, err := identity.CertificateFromPEM(certificatePEM) diff --git a/asset-transfer-private-data/application-gateway-go/connect.go b/asset-transfer-private-data/application-gateway-go/connect.go index 3bd76863..334ffd9f 100644 --- a/asset-transfer-private-data/application-gateway-go/connect.go +++ b/asset-transfer-private-data/application-gateway-go/connect.go @@ -1,5 +1,5 @@ /* -Copyright 2022 IBM All Rights Reserved. +Copyright 2024 IBM All Rights Reserved. SPDX-License-Identifier: Apache-2.0 */ @@ -77,8 +77,8 @@ func newIdentity(certDirectoryPath, mspId string) *identity.X509Identity { } // newSign creates a function that generates a digital signature from a message digest using a private key. -func newSign(keyDirectoryPash string) identity.Sign { - privateKeyPEM, err := readFirstFile(keyDirectoryPash) +func newSign(keyDirectoryPath string) identity.Sign { + privateKeyPEM, err := readFirstFile(keyDirectoryPath) if err != nil { panic(fmt.Errorf("failed to read private key file: %w", err)) } diff --git a/ci/scripts/run-test-network-off-chain.sh b/ci/scripts/run-test-network-off-chain.sh index 872cda41..dc7bfc1e 100755 --- a/ci/scripts/run-test-network-off-chain.sh +++ b/ci/scripts/run-test-network-off-chain.sh @@ -41,11 +41,22 @@ print "Initializing Typescript off-chain data application" pushd ../off_chain_data/application-typescript rm -f checkpoint.json store.log npm install -print "Running the output app" +print "Running the Typescript app" SIMULATED_FAILURE_COUNT=1 npm start getAllAssets transact getAllAssets listen SIMULATED_FAILURE_COUNT=1 npm start listen popd +# Run off-chain data Go application +export CHAINCODE_NAME=go_off_chain_data +deployChaincode +print "Initializing Go off-chain data application" +pushd ../off_chain_data/application-go +rm -f checkpoint.json store.log +print "Running the Go app" +SIMULATED_FAILURE_COUNT=1 go run . getAllAssets transact getAllAssets listen +SIMULATED_FAILURE_COUNT=1 go run . listen +popd + # Run off-chain data Java application #createNetwork export CHAINCODE_NAME=off_chain_data diff --git a/off_chain_data/README.md b/off_chain_data/README.md index 3b931492..33bc6468 100644 --- a/off_chain_data/README.md +++ b/off_chain_data/README.md @@ -19,16 +19,19 @@ The client application provides several "commands" that can be invoked using the - **getAllAssets**: Retrieve the current details of all assets recorded on the ledger. See: - TypeScript: [application-typescript/src/getAllAssets.ts](application-typescript/src/getAllAssets.ts) - Java: [application-java/app/src/main/java/GetAllAssets.java](application-java/app/src/main/java/GetAllAssets.java) + - Go: [application-go/getAllAssets.go](application-go/getAllAssets.go) - **listen**: Listen for block events, and use them to replicate ledger updates in an off-chain data store. See: - TypeScript: [application-typescript/src/listen.ts](application-typescript/src/listen.ts) - Java: [application-java/app/src/main/java/Listen.java](application-java/app/src/main/java/Listen.java) + - Go: [application-go/listen.go](application-go/listen.go) - **transact**: Submit a set of transactions to create, modify and delete assets. See: - TypeScript: [application-typescript/src/transact.ts](application-typescript/src/transact.ts) - Java: [application-java/app/src/main/java/Transact.java](application-java/app/src/main/java/Transact.java) + - Go: [application-go/transact.go](application-go/transact.go) To keep the sample code concise, the **listen** command writes ledger updates to an output file named `store.log` in the current working directory (which for the Java sample is the `application-java/app` directory). A real implementation could write ledger updates directly to an off-chain data store of choice. You can inspect the information captured in this file as you run the sample. -Note that the **listen** command is is restartable and will resume event listening after the last successfully processed block / transaction. This is achieved using a checkpointer to persist the current listening position. Checkpoint state is persisted to a file named `checkpoint.json` in the current working directory. If no checkpoint state is present, event listening begins from the start of the ledger (block number zero). +Note that the **listen** command is restartable and will resume event listening after the last successfully processed block / transaction. This is achieved using a checkpointer to persist the current listening position. Checkpoint state is persisted to a file named `checkpoint.json` in the current working directory. If no checkpoint state is present, event listening begins from the start of the ledger (block number zero). ### Smart Contract @@ -65,6 +68,10 @@ The Fabric test network is used to deploy and run this sample. Follow these step npm install npm start transact listen + # To run the Go sample application + cd application-go + go run . transact listen + # To run the Java sample application cd application-java ./gradlew run --quiet --args='transact listen' @@ -79,6 +86,10 @@ The Fabric test network is used to deploy and run this sample. Follow these step cd application-typescript npm --silent start getAllAssets + # To run the Go sample application + cd application-go + go run . getAllAssets + # To run the Java sample application cd application-java ./gradlew run --quiet --args=getAllAssets @@ -93,6 +104,12 @@ The Fabric test network is used to deploy and run this sample. Follow these step SIMULATED_FAILURE_COUNT=5 npm start listen npm start listen + # To run the Go sample application + cd application-go + go run . transact + SIMULATED_FAILURE_COUNT=5 go run . listen + go run . listen + # To run the Java sample application cd application-java ./gradlew run --quiet --args=transact @@ -112,4 +129,4 @@ When you are finished, you can bring down the test network (from the `test-netwo ``` ./network.sh down -``` \ No newline at end of file +``` diff --git a/off_chain_data/application-go/app.go b/off_chain_data/application-go/app.go new file mode 100644 index 00000000..4469df2c --- /dev/null +++ b/off_chain_data/application-go/app.go @@ -0,0 +1,66 @@ +package main + +import ( + "errors" + "fmt" + "os" + "strings" + + "google.golang.org/grpc" +) + +type command func(grpc.ClientConnInterface) error + +var allCommands = map[string]command{ + "getAllAssets": getAllAssets, + "transact": transact, + "listen": listen, +} + +func main() { + commands := os.Args[1:] + if len(commands) == 0 { + printUsage() + panic(errors.New("missing command")) + } + + for _, name := range commands { + if _, exists := allCommands[name]; !exists { + printUsage() + panic(fmt.Errorf("unknown command: %s", name)) + } + fmt.Println("command:", name) + } + + client := newGrpcConnection() + defer client.Close() + + for _, name := range commands { + command := allCommands[name] + + if err := command(client); err != nil { + if errors.Is(err, errExpected) { + fmt.Println(err) + return + } + + panic(err) + } + } +} + +func printUsage() { + fmt.Println("Arguments: [ ...]") + fmt.Println("Available commands:", availableCommands()) +} + +func availableCommands() string { + result := make([]string, len(allCommands)) + i := 0 + for command := range allCommands { + result[i] = command + i++ + } + + return strings.Join(result, ", ") +} diff --git a/off_chain_data/application-go/connect.go b/off_chain_data/application-go/connect.go new file mode 100644 index 00000000..d4566132 --- /dev/null +++ b/off_chain_data/application-go/connect.go @@ -0,0 +1,128 @@ +package main + +import ( + "crypto/x509" + "fmt" + "os" + "path" + "time" + + "github.com/hyperledger/fabric-gateway/pkg/client" + "github.com/hyperledger/fabric-gateway/pkg/hash" + "github.com/hyperledger/fabric-gateway/pkg/identity" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" +) + +const peerName = "peer0.org1.example.com" + +var ( + channelName = envOrDefault("CHANNEL_NAME", "mychannel") + chaincodeName = envOrDefault("CHAINCODE_NAME", "basic") + mspID = envOrDefault("MSP_ID", "Org1MSP") + + // Path to crypto materials. + cryptoPath = envOrDefault("CRYPTO_PATH", "../../test-network/organizations/peerOrganizations/org1.example.com") + + // Path to user private key directory. + keyDirectoryPath = envOrDefault("KEY_DIRECTORY_PATH", cryptoPath+"/users/User1@org1.example.com/msp/keystore") + + // Path to user certificate. + certPath = envOrDefault("CERT_PATH", cryptoPath+"/users/User1@org1.example.com/msp/signcerts/cert.pem") + + // Path to peer tls certificate. + tlsCertPath = envOrDefault("TLS_CERT_PATH", cryptoPath+"/peers/peer0.org1.example.com/tls/ca.crt") + + // Gateway peer endpoint. + peerEndpoint = envOrDefault("PEER_ENDPOINT", "dns:///localhost:7051") + + // Gateway peer SSL host name override. + peerHostAlias = envOrDefault("PEER_HOST_ALIAS", peerName) +) + +func newGrpcConnection() *grpc.ClientConn { + certificatePEM, err := os.ReadFile(tlsCertPath) + if err != nil { + panic(fmt.Errorf("failed to read TLS certificate file: %w", err)) + } + + certificate, err := identity.CertificateFromPEM(certificatePEM) + if err != nil { + panic(err) + } + + certPool := x509.NewCertPool() + certPool.AddCert(certificate) + transportCredentials := credentials.NewClientTLSFromCert(certPool, peerHostAlias) + + connection, err := grpc.NewClient(peerEndpoint, grpc.WithTransportCredentials(transportCredentials)) + if err != nil { + panic(fmt.Errorf("failed to create gRPC connection: %w", err)) + } + + return connection +} + +func newConnectOptions(clientConnection grpc.ClientConnInterface) (identity.Identity, []client.ConnectOption) { + return newIdentity(), []client.ConnectOption{ + client.WithSign(newSign()), + client.WithHash(hash.SHA256), + client.WithClientConnection(clientConnection), + client.WithEvaluateTimeout(5 * time.Second), + client.WithEndorseTimeout(15 * time.Second), + client.WithSubmitTimeout(5 * time.Second), + client.WithCommitStatusTimeout(1 * time.Minute), + } +} + +func newIdentity() *identity.X509Identity { + certificatePEM, err := os.ReadFile(certPath) + if err != nil { + panic(fmt.Errorf("failed to read certificate file: %w", err)) + } + + certificate, err := identity.CertificateFromPEM(certificatePEM) + if err != nil { + panic(err) + } + + id, err := identity.NewX509Identity(mspID, certificate) + if err != nil { + panic(err) + } + + return id +} + +func newSign() identity.Sign { + privateKeyPEM, err := readFirstFile(keyDirectoryPath) + if err != nil { + panic(fmt.Errorf("failed to read private key file: %w", err)) + } + + privateKey, err := identity.PrivateKeyFromPEM(privateKeyPEM) + if err != nil { + panic(err) + } + + sign, err := identity.NewPrivateKeySign(privateKey) + if err != nil { + panic(err) + } + + return sign +} + +func readFirstFile(dirPath string) ([]byte, error) { + dir, err := os.Open(dirPath) + if err != nil { + return nil, err + } + + fileNames, err := dir.Readdirnames(1) + if err != nil { + return nil, err + } + + return os.ReadFile(path.Join(dirPath, fileNames[0])) +} diff --git a/off_chain_data/application-go/contract/contract.go b/off_chain_data/application-go/contract/contract.go new file mode 100644 index 00000000..5409cdee --- /dev/null +++ b/off_chain_data/application-go/contract/contract.go @@ -0,0 +1,76 @@ +package contract + +import ( + "encoding/json" + "strconv" + + "github.com/hyperledger/fabric-gateway/pkg/client" +) + +type AssetTransferBasic struct { + contract *client.Contract +} + +func NewAssetTransferBasic(contract *client.Contract) *AssetTransferBasic { + return &AssetTransferBasic{contract} +} + +func (atb *AssetTransferBasic) CreateAsset(anAsset Asset) error { + if _, err := atb.contract.Submit( + "CreateAsset", + client.WithArguments( + anAsset.ID, + anAsset.Color, + strconv.FormatUint(anAsset.Size, 10), + anAsset.Owner, + strconv.FormatUint(anAsset.AppraisedValue, 10), + )); err != nil { + return err + } + return nil +} + +func (atb *AssetTransferBasic) TransferAsset(id, newOwner string) (string, error) { + result, err := atb.contract.Submit( + "TransferAsset", + client.WithArguments( + id, + newOwner, + ), + ) + if err != nil { + return "", err + } + + return string(result), nil +} + +func (atb *AssetTransferBasic) DeleteAsset(id string) error { + if _, err := atb.contract.Submit( + "DeleteAsset", + client.WithArguments( + id, + ), + ); err != nil { + return err + } + return nil +} + +func (atb *AssetTransferBasic) GetAllAssets() ([]Asset, error) { + assetsRaw, err := atb.contract.Evaluate("GetAllAssets") + if err != nil { + return nil, err + } + + if len(assetsRaw) == 0 { + return nil, nil + } + + assets := []Asset{} + if err := json.Unmarshal(assetsRaw, &assets); err != nil { + return nil, err + } + + return assets, nil +} diff --git a/off_chain_data/application-go/contract/model.go b/off_chain_data/application-go/contract/model.go new file mode 100644 index 00000000..a3dece53 --- /dev/null +++ b/off_chain_data/application-go/contract/model.go @@ -0,0 +1,9 @@ +package contract + +type Asset struct { + ID string `json:"ID"` + Color string `json:"Color"` + Size uint64 `json:"Size"` + Owner string `json:"Owner"` + AppraisedValue uint64 `json:"AppraisedValue"` +} diff --git a/off_chain_data/application-go/getAllAssets.go b/off_chain_data/application-go/getAllAssets.go new file mode 100644 index 00000000..95619be9 --- /dev/null +++ b/off_chain_data/application-go/getAllAssets.go @@ -0,0 +1,34 @@ +package main + +import ( + "encoding/json" + "fmt" + atb "offchaindata/contract" + + "github.com/hyperledger/fabric-gateway/pkg/client" + "google.golang.org/grpc" +) + +func getAllAssets(clientConnection grpc.ClientConnInterface) error { + id, options := newConnectOptions(clientConnection) + gateway, err := client.Connect(id, options...) + if err != nil { + return err + } + defer gateway.Close() + + contract := gateway.GetNetwork(channelName).GetContract(chaincodeName) + smartContract := atb.NewAssetTransferBasic(contract) + assets, err := smartContract.GetAllAssets() + if err != nil { + return err + } + + formatted, err := json.MarshalIndent(assets, "", " ") + if err != nil { + return err + } + fmt.Println(string(formatted)) + + return nil +} diff --git a/off_chain_data/application-go/go.mod b/off_chain_data/application-go/go.mod new file mode 100644 index 00000000..69ed2296 --- /dev/null +++ b/off_chain_data/application-go/go.mod @@ -0,0 +1,20 @@ +module offchaindata + +go 1.22.0 + +require ( + github.com/google/uuid v1.6.0 + github.com/hyperledger/fabric-gateway v1.7.0 + github.com/hyperledger/fabric-protos-go-apiv2 v0.3.4 + google.golang.org/grpc v1.68.0-dev + google.golang.org/protobuf v1.35.2 +) + +require ( + github.com/miekg/pkcs11 v1.1.1 // indirect + golang.org/x/crypto v0.29.0 // indirect + golang.org/x/net v0.31.0 // indirect + golang.org/x/sys v0.27.0 // indirect + golang.org/x/text v0.20.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20241118233622-e639e219e697 // indirect +) diff --git a/off_chain_data/application-go/go.sum b/off_chain_data/application-go/go.sum new file mode 100644 index 00000000..e3d2874e --- /dev/null +++ b/off_chain_data/application-go/go.sum @@ -0,0 +1,34 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hyperledger/fabric-gateway v1.7.0 h1:bd1quU8qYPYqYO69m1tPIDSjB+D+u/rBJfE1eWFcpjY= +github.com/hyperledger/fabric-gateway v1.7.0/go.mod h1:TItDGnq71eJcgz5TW+m5Sq3kWGp0AEI1HPCNxj0Eu7k= +github.com/hyperledger/fabric-protos-go-apiv2 v0.3.4 h1:YJrd+gMaeY0/vsN0aS0QkEKTivGoUnSRIXxGJ7KI+Pc= +github.com/hyperledger/fabric-protos-go-apiv2 v0.3.4/go.mod h1:bau/6AJhvEcu9GKKYHlDXAxXKzYNfhP6xu2GXuxEcFk= +github.com/miekg/pkcs11 v1.1.1 h1:Ugu9pdy6vAYku5DEpVWVFPYnzV+bxB+iRdbuFSu7TvU= +github.com/miekg/pkcs11 v1.1.1/go.mod h1:XsNlhZGX73bx86s2hdc/FuaLm2CPZJemRLMA+WTFxgs= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +golang.org/x/crypto v0.29.0 h1:L5SG1JTTXupVV3n6sUqMTeWbjAyfPwoda2DLX8J8FrQ= +golang.org/x/crypto v0.29.0/go.mod h1:+F4F4N5hv6v38hfeYwTdx20oUvLLc+QfrE9Ax9HtgRg= +golang.org/x/net v0.31.0 h1:68CPQngjLL0r2AlUKiSxtQFKvzRVbnzLwMUn5SzcLHo= +golang.org/x/net v0.31.0/go.mod h1:P4fl1q7dY2hnZFxEk4pPSkDHF+QqjitcnDjUQyMM+pM= +golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s= +golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.20.0 h1:gK/Kv2otX8gz+wn7Rmb3vT96ZwuoxnQlY+HlJVj7Qug= +golang.org/x/text v0.20.0/go.mod h1:D4IsuqiFMhST5bX19pQ9ikHC2GsaKyk/oF+pn3ducp4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241118233622-e639e219e697 h1:LWZqQOEjDyONlF1H6afSWpAL/znlREo2tHfLoe+8LMA= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241118233622-e639e219e697/go.mod h1:5uTbfoYQed2U9p3KIj2/Zzm02PYhndfdmML0qC3q3FU= +google.golang.org/grpc v1.68.0-dev h1:Qao/m2HpklhJt2QbpdRutxyNfRuwM8nGPpmi2UkuEHw= +google.golang.org/grpc v1.68.0-dev/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA= +google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io= +google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/off_chain_data/application-go/listen.go b/off_chain_data/application-go/listen.go new file mode 100644 index 00000000..58d799d4 --- /dev/null +++ b/off_chain_data/application-go/listen.go @@ -0,0 +1,310 @@ +package main + +import ( + "context" + "fmt" + "offchaindata/parser" + "os" + "os/signal" + "slices" + "strconv" + "strings" + "syscall" + + "github.com/hyperledger/fabric-gateway/pkg/client" + "github.com/hyperledger/fabric-protos-go-apiv2/ledger/rwset/kvrwset" + "google.golang.org/grpc" +) + +func listen(clientConnection grpc.ClientConnInterface) error { + id, options := newConnectOptions(clientConnection) + gateway, err := client.Connect(id, options...) + if err != nil { + return err + } + defer func() { + gateway.Close() + fmt.Println("Gateway closed.") + }() + + checkpointFile := envOrDefault("CHECKPOINT_FILE", "checkpoint.json") + checkpointer, err := client.NewFileCheckpointer(checkpointFile) + if err != nil { + return err + } + defer func() { + checkpointer.Close() + fmt.Println("Checkpointer closed.") + }() + fmt.Println("Start event listening from block", checkpointer.BlockNumber()) + fmt.Println("Last processed transaction ID within block:", checkpointer.TransactionID()) + + simulatedFailureCount := initSimulatedFailureCount() + if simulatedFailureCount > 0 { + fmt.Printf("Simulating a write failure every %d transactions\n", simulatedFailureCount) + } + storeFile := envOrDefault("STORE_FILE", "store.log") + offChainStore := newOffChainStore(storeFile, simulatedFailureCount) + + ctx, close := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer func() { + close() + fmt.Println("Context closed.") + }() + + network := gateway.GetNetwork(channelName) + blocks, err := network.BlockEvents( + ctx, + // Used only if there is no checkpoint block number. + // Order matters. WithStartBlock must be set before + // WithCheckpoint to work. + client.WithStartBlock(0), + client.WithCheckpoint(checkpointer), + ) + if err != nil { + return err + } + + for blockProto := range blocks { + aBlockProcessor := blockProcessor{ + parser.ParseBlock(blockProto), + checkpointer, + offChainStore, + } + + if err := aBlockProcessor.process(); err != nil { + return err + } + } + + fmt.Println("\nShutting down listener gracefully...") + return nil +} + +func initSimulatedFailureCount() uint { + valueAsString := envOrDefault("SIMULATED_FAILURE_COUNT", "0") + result, err := strconv.ParseUint(valueAsString, 10, 0) + if err != nil { + panic(fmt.Errorf("invalid SIMULATED_FAILURE_COUNT value: %s", valueAsString)) + } + + return uint(result) +} + +// Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance. +type store interface { + write(ledgerUpdate) error +} + +// Ledger update made by a specific transaction. +type ledgerUpdate struct { + BlockNumber uint64 + TransactionID string + Writes []write +} + +// Description of a ledger Write that can be applied to an off-chain data store. +type write struct { + // Channel whose ledger is being updated. + ChannelName string `json:"channelName"` + // Namespace within the ledger. + Namespace string `json:"namespace"` + // Key name within the ledger namespace. + Key string `json:"key"` + // Whether the key and associated value are being deleted. + IsDelete bool `json:"isDelete"` + // If `isDelete` is false, the Value written to the key; otherwise ignored. + Value string `json:"value"` +} + +type blockProcessor struct { + parsedBlock *parser.Block + checkpointer *client.FileCheckpointer + store store +} + +func (b *blockProcessor) process() error { + fmt.Println("\nReceived block", b.parsedBlock.Number()) + + validTransactions, err := b.validTransactions() + if err != nil { + return err + } + + for _, validTransaction := range validTransactions { + txProcessor := transactionProcessor{ + b.parsedBlock.Number(), + validTransaction, + b.store, + } + if err := txProcessor.process(); err != nil { + return err + } + + transactionID := validTransaction.ChannelHeader().GetTxId() + if err := b.checkpointer.CheckpointTransaction(b.parsedBlock.Number(), transactionID); err != nil { + return err + } + } + + if err := b.checkpointer.CheckpointBlock(b.parsedBlock.Number()); err != nil { + return err + } + + return nil +} + +func (b *blockProcessor) validTransactions() ([]*parser.Transaction, error) { + newTransactions, err := b.getNewTransactions() + if err != nil { + return nil, err + } + + result := []*parser.Transaction{} + for _, transaction := range newTransactions { + if transaction.IsValid() { + result = append(result, transaction) + } + } + return result, nil +} + +func (b *blockProcessor) getNewTransactions() ([]*parser.Transaction, error) { + transactions, err := b.parsedBlock.Transactions() + if err != nil { + return nil, err + } + + lastTransactionID := b.checkpointer.TransactionID() + if lastTransactionID == "" { + // No previously processed transactions within this block so all are new + return transactions, nil + } + + // Ignore transactions up to the last processed transaction ID + lastProcessedIndex, err := b.findLastProcessedIndex() + if err != nil { + return nil, err + } + return transactions[lastProcessedIndex+1:], nil +} + +func (b *blockProcessor) findLastProcessedIndex() (int, error) { + transactions, err := b.parsedBlock.Transactions() + if err != nil { + return 0, err + } + + blockTransactionIDs := []string{} + for _, transaction := range transactions { + blockTransactionIDs = append(blockTransactionIDs, transaction.ChannelHeader().GetTxId()) + } + + lastTransactionID := b.checkpointer.TransactionID() + lastProcessedIndex := -1 + for index, id := range blockTransactionIDs { + if id == lastTransactionID { + lastProcessedIndex = index + } + } + + if lastProcessedIndex < 0 { + err = fmt.Errorf( + "checkpoint transaction ID %s not found in block %d containing transactions: %s", + lastTransactionID, + b.parsedBlock.Number(), + strings.Join(blockTransactionIDs, ", "), + ) + return lastProcessedIndex, err + } + + return lastProcessedIndex, nil +} + +type transactionProcessor struct { + blockNumber uint64 + transaction *parser.Transaction + store store +} + +func (t *transactionProcessor) process() error { + transactionID := t.transaction.ChannelHeader().GetTxId() + + writes, err := t.writes() + if err != nil { + return err + } + + if len(writes) == 0 { + fmt.Println("Skipping read-only or system transaction", transactionID) + return nil + } + + fmt.Println("Process transaction", transactionID) + if err := t.store.write(ledgerUpdate{ + BlockNumber: t.blockNumber, + TransactionID: transactionID, + Writes: writes, + }); err != nil { + return err + } + + return nil +} + +func (t *transactionProcessor) writes() ([]write, error) { + nsReadWriteSets, err := t.nonSystemCCReadWriteSets() + if err != nil { + return nil, err + } + + result := []write{} + for _, nsReadWriteSet := range nsReadWriteSets { + kvReadWriteSet, err := nsReadWriteSet.ReadWriteSet() + if err != nil { + return nil, err + } + + result = t.newWrites(kvReadWriteSet, nsReadWriteSet.Namespace()) + } + + return result, nil +} + +func (t *transactionProcessor) nonSystemCCReadWriteSets() ([]*parser.NamespaceReadWriteSet, error) { + nsReadWriteSets, err := t.transaction.NamespaceReadWriteSets() + if err != nil { + return nil, err + } + + return slices.DeleteFunc(nsReadWriteSets, func(nsReadWriteSet *parser.NamespaceReadWriteSet) bool { + return t.isSystemChaincode(nsReadWriteSet.Namespace()) + }), nil +} + +func (t *transactionProcessor) isSystemChaincode(chaincodeName string) bool { + systemChaincodeNames := []string{ + "_lifecycle", + "cscc", + "escc", + "lscc", + "qscc", + "vscc", + } + return slices.Contains(systemChaincodeNames, chaincodeName) +} + +func (t *transactionProcessor) newWrites(kvReadWriteSet *kvrwset.KVRWSet, namespace string) []write { + result := []write{} + for _, kvWrite := range kvReadWriteSet.GetWrites() { + result = append(result, write{ + ChannelName: t.transaction.ChannelHeader().GetChannelId(), + Namespace: namespace, + Key: kvWrite.GetKey(), + IsDelete: kvWrite.GetIsDelete(), + Value: string(kvWrite.GetValue()), // Convert bytes to text, purely for readability in output + }) + } + + return result +} diff --git a/off_chain_data/application-go/parser/block.go b/off_chain_data/application-go/parser/block.go new file mode 100644 index 00000000..2200284b --- /dev/null +++ b/off_chain_data/application-go/parser/block.go @@ -0,0 +1,98 @@ +package parser + +import ( + "sync" + + "github.com/hyperledger/fabric-protos-go-apiv2/common" + "google.golang.org/protobuf/proto" +) + +type Block struct { + block *common.Block + transactions func() ([]*Transaction, error) +} + +func ParseBlock(block *common.Block) *Block { + result := &Block{block, nil} + result.transactions = sync.OnceValues(result.unmarshalTransactions) + return result +} + +func (b *Block) Number() uint64 { + return b.block.GetHeader().GetNumber() +} + +func (b *Block) Transactions() ([]*Transaction, error) { + return b.transactions() +} + +func (b *Block) unmarshalTransactions() ([]*Transaction, error) { + envelopes, err := b.unmarshalEnvelopes() + if err != nil { + return nil, err + } + + commonPayloads, err := b.unmarshalPayloadsFrom(envelopes) + if err != nil { + return nil, err + } + + payloads, err := b.parse(commonPayloads) + if err != nil { + return nil, err + } + + return b.createTransactionsFrom(payloads), nil +} + +func (b *Block) unmarshalEnvelopes() ([]*common.Envelope, error) { + var result []*common.Envelope + for _, blockData := range b.block.GetData().GetData() { + envelope := &common.Envelope{} + if err := proto.Unmarshal(blockData, envelope); err != nil { + return nil, err + } + result = append(result, envelope) + } + return result, nil +} + +func (*Block) unmarshalPayloadsFrom(envelopes []*common.Envelope) ([]*common.Payload, error) { + var result []*common.Payload + for _, envelope := range envelopes { + commonPayload := &common.Payload{} + if err := proto.Unmarshal(envelope.GetPayload(), commonPayload); err != nil { + return nil, err + } + result = append(result, commonPayload) + } + return result, nil +} + +func (b *Block) parse(commonPayloads []*common.Payload) ([]*payload, error) { + validationCodes := b.block.GetMetadata().GetMetadata()[common.BlockMetadataIndex_TRANSACTIONS_FILTER] + + var result []*payload + for i, commonPayload := range commonPayloads { + statusCode := validationCodes[i] + + payload, err := parsePayload(commonPayload, int32(statusCode)) + if err != nil { + return nil, err + } + + if payload.isEndorserTransaction() { + result = append(result, payload) + } + } + + return result, nil +} + +func (*Block) createTransactionsFrom(payloads []*payload) []*Transaction { + var result []*Transaction + for _, payload := range payloads { + result = append(result, newTransaction(payload)) + } + return result +} diff --git a/off_chain_data/application-go/parser/block_test.go b/off_chain_data/application-go/parser/block_test.go new file mode 100644 index 00000000..6e0f94b6 --- /dev/null +++ b/off_chain_data/application-go/parser/block_test.go @@ -0,0 +1,144 @@ +package parser + +import ( + "encoding/json" + "testing" + + atb "offchaindata/contract" + + "github.com/hyperledger/fabric-protos-go-apiv2/ledger/rwset" + "github.com/hyperledger/fabric-protos-go-apiv2/ledger/rwset/kvrwset" + "github.com/hyperledger/fabric-protos-go-apiv2/peer" + "google.golang.org/protobuf/proto" +) + +func Test_GetReadWriteSetsFromEndorserTransaction(t *testing.T) { + nsReadWriteSetFake, expectedNamespace, expectedAsset := nsReadWriteSetFake() + + transaction := &peer.Transaction{ + Actions: []*peer.TransactionAction{ + { + Payload: protoMarshalOrPanic(&peer.ChaincodeActionPayload{ + Action: &peer.ChaincodeEndorsedAction{ + ProposalResponsePayload: protoMarshalOrPanic(&peer.ProposalResponsePayload{ + Extension: protoMarshalOrPanic(&peer.ChaincodeAction{ + Results: protoMarshalOrPanic(&rwset.TxReadWriteSet{ + NsRwset: []*rwset.NsReadWriteSet{nsReadWriteSetFake}, + }), + }), + }), + }, + }), + }, + }, + } + + parsedEndorserTransaction := parseEndorserTransaction(transaction) + readWriteSets, err := parsedEndorserTransaction.readWriteSets() + if err != nil { + t.Fatal("unexpected error:", err) + } + + if len(readWriteSets) != 1 { + t.Fatal("expected 1 ReadWriteSet, got", len(readWriteSets)) + } + + assertReadWriteSet( + readWriteSets[0].namespaceReadWriteSets()[0], + expectedNamespace, + expectedAsset, + t, + ) +} + +func assertReadWriteSet( + parsedNsRwSet *NamespaceReadWriteSet, + expectedNamespace string, + expectedAsset atb.Asset, + t *testing.T, +) { + if parsedNsRwSet.Namespace() != expectedNamespace { + t.Errorf("expected namespace %s, got %s", expectedNamespace, parsedNsRwSet.Namespace()) + } + + actualKVRWSet, err := parsedNsRwSet.ReadWriteSet() + if err != nil { + t.Fatal("unexpected error:", err) + } + if len(actualKVRWSet.Writes) != 1 { + t.Fatal("expected 1 write, got", len(actualKVRWSet.Writes)) + } + + actualWrite := actualKVRWSet.Writes[0] + if actualWrite.GetKey() != expectedAsset.ID { + t.Errorf("expected key %s, got %s", expectedAsset.ID, actualWrite.GetKey()) + } + + if string(actualWrite.GetValue()) != string(jsonMarshalOrPanic(expectedAsset)) { + t.Errorf("expected value %s, got %s", jsonMarshalOrPanic(expectedAsset), actualWrite.GetValue()) + } +} + +func Test_ReadWriteSetWrapping(t *testing.T) { + nsReadWriteSetFake, _, _ := nsReadWriteSetFake() + + txReadWriteSetFake := &rwset.TxReadWriteSet{ + NsRwset: []*rwset.NsReadWriteSet{nsReadWriteSetFake}, + } + + parsedRwSet := parseReadWriteSet(txReadWriteSetFake) + if len(parsedRwSet.namespaceReadWriteSets()) != 1 { + t.Fatal("expected 1 NamespaceReadWriteSet, got", len(parsedRwSet.namespaceReadWriteSets())) + } +} + +func Test_NamespaceReadWriteSetParsing(t *testing.T) { + nsReadWriteSetFake, expectedNamespace, expectedAsset := nsReadWriteSetFake() + + parsedNsRwSet := parseNamespaceReadWriteSet(nsReadWriteSetFake) + assertReadWriteSet( + parsedNsRwSet, + expectedNamespace, + expectedAsset, + t, + ) +} + +func nsReadWriteSetFake() (*rwset.NsReadWriteSet, string, atb.Asset) { + expectedNamespace := "basic" + expectedAsset := atb.Asset{ + ID: "id-1", + Color: "green", + Size: 8, + Owner: "Alice", + AppraisedValue: 346, + } + + result := &rwset.NsReadWriteSet{ + Namespace: expectedNamespace, + Rwset: protoMarshalOrPanic(&kvrwset.KVRWSet{ + Writes: []*kvrwset.KVWrite{{ + Key: expectedAsset.ID, + Value: []byte(jsonMarshalOrPanic(expectedAsset)), + }}, + }), + } + + return result, expectedNamespace, expectedAsset +} + +func protoMarshalOrPanic(v proto.Message) []byte { + result, err := proto.Marshal(v) + if err != nil { + panic(err) + } + return result +} + +func jsonMarshalOrPanic(v any) []byte { + result, err := json.Marshal(v) + if err != nil { + panic(err) + } + return result +} diff --git a/off_chain_data/application-go/parser/endorserTransaction.go b/off_chain_data/application-go/parser/endorserTransaction.go new file mode 100644 index 00000000..cd73802a --- /dev/null +++ b/off_chain_data/application-go/parser/endorserTransaction.go @@ -0,0 +1,101 @@ +package parser + +import ( + "sync" + + "github.com/hyperledger/fabric-protos-go-apiv2/ledger/rwset" + "github.com/hyperledger/fabric-protos-go-apiv2/peer" + "google.golang.org/protobuf/proto" +) + +type endorserTransaction struct { + transaction *peer.Transaction + readWriteSets func() ([]*readWriteSet, error) +} + +func parseEndorserTransaction(transaction *peer.Transaction) *endorserTransaction { + result := &endorserTransaction{transaction, nil} + result.readWriteSets = sync.OnceValues(result.unmarshalReadWriteSets) + return result +} + +func (p *endorserTransaction) unmarshalReadWriteSets() ([]*readWriteSet, error) { + chaincodeActionPayloads, err := p.unmarshalChaincodeActionPayloads() + if err != nil { + return nil, err + } + + proposalResponsePayloads, err := p.unmarshalProposalResponsePayloadsFrom(chaincodeActionPayloads) + if err != nil { + return nil, err + } + + chaincodeActions, err := p.unmarshalChaincodeActionsFrom(proposalResponsePayloads) + if err != nil { + return nil, err + } + + txReadWriteSets, err := p.unmarshalTxReadWriteSetsFrom(chaincodeActions) + if err != nil { + return nil, err + } + + return p.parseReadWriteSets(txReadWriteSets), nil +} + +func (p *endorserTransaction) unmarshalChaincodeActionPayloads() ([]*peer.ChaincodeActionPayload, error) { + var result []*peer.ChaincodeActionPayload + for _, transactionAction := range p.transaction.GetActions() { + chaincodeActionPayload := &peer.ChaincodeActionPayload{} + if err := proto.Unmarshal(transactionAction.GetPayload(), chaincodeActionPayload); err != nil { + return nil, err + } + result = append(result, chaincodeActionPayload) + } + return result, nil +} + +func (*endorserTransaction) unmarshalProposalResponsePayloadsFrom(chaincodeActionPayloads []*peer.ChaincodeActionPayload) ([]*peer.ProposalResponsePayload, error) { + var result []*peer.ProposalResponsePayload + for _, chaincodeActionPayload := range chaincodeActionPayloads { + proposalResponsePayload := &peer.ProposalResponsePayload{} + if err := proto.Unmarshal(chaincodeActionPayload.GetAction().GetProposalResponsePayload(), proposalResponsePayload); err != nil { + return nil, err + } + result = append(result, proposalResponsePayload) + } + return result, nil +} + +func (*endorserTransaction) unmarshalChaincodeActionsFrom(proposalResponsePayloads []*peer.ProposalResponsePayload) ([]*peer.ChaincodeAction, error) { + var result []*peer.ChaincodeAction + for _, proposalResponsePayload := range proposalResponsePayloads { + chaincodeAction := &peer.ChaincodeAction{} + if err := proto.Unmarshal(proposalResponsePayload.GetExtension(), chaincodeAction); err != nil { + return nil, err + } + result = append(result, chaincodeAction) + } + return result, nil +} + +func (*endorserTransaction) unmarshalTxReadWriteSetsFrom(chaincodeActions []*peer.ChaincodeAction) ([]*rwset.TxReadWriteSet, error) { + var result []*rwset.TxReadWriteSet + for _, chaincodeAction := range chaincodeActions { + txReadWriteSet := &rwset.TxReadWriteSet{} + if err := proto.Unmarshal(chaincodeAction.GetResults(), txReadWriteSet); err != nil { + return nil, err + } + result = append(result, txReadWriteSet) + } + return result, nil +} + +func (*endorserTransaction) parseReadWriteSets(txReadWriteSets []*rwset.TxReadWriteSet) []*readWriteSet { + var result []*readWriteSet + for _, txReadWriteSet := range txReadWriteSets { + parsedReadWriteSet := parseReadWriteSet(txReadWriteSet) + result = append(result, parsedReadWriteSet) + } + return result +} diff --git a/off_chain_data/application-go/parser/namespaceReadWriteSet.go b/off_chain_data/application-go/parser/namespaceReadWriteSet.go new file mode 100644 index 00000000..ce108d4a --- /dev/null +++ b/off_chain_data/application-go/parser/namespaceReadWriteSet.go @@ -0,0 +1,37 @@ +package parser + +import ( + "sync" + + "github.com/hyperledger/fabric-protos-go-apiv2/ledger/rwset" + "github.com/hyperledger/fabric-protos-go-apiv2/ledger/rwset/kvrwset" + "google.golang.org/protobuf/proto" +) + +type NamespaceReadWriteSet struct { + nsReadWriteSet *rwset.NsReadWriteSet + readWriteSet func() (*kvrwset.KVRWSet, error) +} + +func parseNamespaceReadWriteSet(nsRwSet *rwset.NsReadWriteSet) *NamespaceReadWriteSet { + result := &NamespaceReadWriteSet{nsRwSet, nil} + result.readWriteSet = sync.OnceValues(result.unmarshalReadWriteSet) + return result +} + +func (p *NamespaceReadWriteSet) Namespace() string { + return p.nsReadWriteSet.GetNamespace() +} + +func (p *NamespaceReadWriteSet) ReadWriteSet() (*kvrwset.KVRWSet, error) { + return p.readWriteSet() +} + +func (p *NamespaceReadWriteSet) unmarshalReadWriteSet() (*kvrwset.KVRWSet, error) { + result := &kvrwset.KVRWSet{} + if err := proto.Unmarshal(p.nsReadWriteSet.GetRwset(), result); err != nil { + return nil, err + } + + return result, nil +} diff --git a/off_chain_data/application-go/parser/payload.go b/off_chain_data/application-go/parser/payload.go new file mode 100644 index 00000000..21a23b60 --- /dev/null +++ b/off_chain_data/application-go/parser/payload.go @@ -0,0 +1,52 @@ +package parser + +import ( + "fmt" + + "github.com/hyperledger/fabric-protos-go-apiv2/common" + "github.com/hyperledger/fabric-protos-go-apiv2/peer" + "google.golang.org/protobuf/proto" +) + +type payload struct { + commonPayload *common.Payload + statusCode int32 + channelHeader *common.ChannelHeader +} + +func parsePayload(commonPayload *common.Payload, statusCode int32) (*payload, error) { + channelHeader, err := unmarshalChannelHeaderFrom(commonPayload) + if err != nil { + return nil, err + } + return &payload{commonPayload, statusCode, channelHeader}, nil +} + +func unmarshalChannelHeaderFrom(commonPayload *common.Payload) (*common.ChannelHeader, error) { + result := &common.ChannelHeader{} + if err := proto.Unmarshal(commonPayload.GetHeader().GetChannelHeader(), result); err != nil { + return nil, err + } + return result, nil +} + +func (p *payload) endorserTransaction() (*endorserTransaction, error) { + if !p.isEndorserTransaction() { + return nil, fmt.Errorf("unexpected payload type: %d", p.channelHeader.GetType()) + } + + result := &peer.Transaction{} + if err := proto.Unmarshal(p.commonPayload.GetData(), result); err != nil { + return nil, err + } + + return parseEndorserTransaction(result), nil +} + +func (p *payload) isEndorserTransaction() bool { + return p.channelHeader.GetType() == int32(common.HeaderType_ENDORSER_TRANSACTION) +} + +func (p *payload) isValid() bool { + return p.statusCode == int32(peer.TxValidationCode_VALID) +} diff --git a/off_chain_data/application-go/parser/readWriteSet.go b/off_chain_data/application-go/parser/readWriteSet.go new file mode 100644 index 00000000..d795c0b8 --- /dev/null +++ b/off_chain_data/application-go/parser/readWriteSet.go @@ -0,0 +1,22 @@ +package parser + +import ( + "github.com/hyperledger/fabric-protos-go-apiv2/ledger/rwset" +) + +type readWriteSet struct { + readWriteSet *rwset.TxReadWriteSet +} + +func parseReadWriteSet(rwSet *rwset.TxReadWriteSet) *readWriteSet { + return &readWriteSet{rwSet} +} + +func (p *readWriteSet) namespaceReadWriteSets() []*NamespaceReadWriteSet { + result := []*NamespaceReadWriteSet{} + for _, nsReadWriteSet := range p.readWriteSet.GetNsRwset() { + parsedNamespaceReadWriteSet := parseNamespaceReadWriteSet(nsReadWriteSet) + result = append(result, parsedNamespaceReadWriteSet) + } + return result +} diff --git a/off_chain_data/application-go/parser/transaction.go b/off_chain_data/application-go/parser/transaction.go new file mode 100644 index 00000000..56ddbf31 --- /dev/null +++ b/off_chain_data/application-go/parser/transaction.go @@ -0,0 +1,39 @@ +package parser + +import ( + "github.com/hyperledger/fabric-protos-go-apiv2/common" +) + +type Transaction struct { + payload *payload +} + +func newTransaction(payload *payload) *Transaction { + return &Transaction{payload} +} + +func (t *Transaction) ChannelHeader() *common.ChannelHeader { + return t.payload.channelHeader +} + +func (t *Transaction) NamespaceReadWriteSets() ([]*NamespaceReadWriteSet, error) { + endorserTransaction, err := t.payload.endorserTransaction() + if err != nil { + return nil, err + } + + txReadWriteSets, err := endorserTransaction.readWriteSets() + if err != nil { + return nil, err + } + + var result []*NamespaceReadWriteSet + for _, readWriteSet := range txReadWriteSets { + result = append(result, readWriteSet.namespaceReadWriteSets()...) + } + return result, nil +} + +func (t *Transaction) IsValid() bool { + return t.payload.isValid() +} diff --git a/off_chain_data/application-go/store.go b/off_chain_data/application-go/store.go new file mode 100644 index 00000000..2764e898 --- /dev/null +++ b/off_chain_data/application-go/store.go @@ -0,0 +1,80 @@ +package main + +import ( + "encoding/json" + "errors" + "fmt" + "os" +) + +var errExpected = errors.New("expected error: simulated write failure") + +type offChainStore struct { + path string + simulatedFailureCount, transactionCount uint +} + +func newOffChainStore(path string, simulatedFailureCount uint) *offChainStore { + return &offChainStore{ + path, + uint(simulatedFailureCount), + 0, + } +} + +// Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance. +// This implementation just writes to a file. +func (ocs *offChainStore) write(data ledgerUpdate) error { + if err := ocs.simulateFailureIfRequired(); err != nil { + return err + } + + writes, err := ocs.marshal(data.Writes) + if err != nil { + return err + } + + return ocs.persist(writes) +} + +func (ocs *offChainStore) simulateFailureIfRequired() error { + if ocs.simulatedFailureCount > 0 && ocs.transactionCount >= ocs.simulatedFailureCount { + ocs.transactionCount = 0 + return errExpected + } + + ocs.transactionCount += 1 + + return nil +} + +func (ocs *offChainStore) marshal(writes []write) (string, error) { + var marshaledWrites string + for _, write := range writes { + marshaled, err := json.Marshal(write) + if err != nil { + return "", err + } + + marshaledWrites += string(marshaled) + "\n" + } + + return marshaledWrites, nil +} + +func (ocs *offChainStore) persist(marshaledWrites string) error { + f, err := os.OpenFile(ocs.path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return err + } + + if _, writeErr := f.Write([]byte(marshaledWrites)); writeErr != nil { + if closeErr := f.Close(); closeErr != nil { + return fmt.Errorf("write error: %v, close error: %v", writeErr, closeErr) + } + + return writeErr + } + + return f.Close() +} diff --git a/off_chain_data/application-go/transact.go b/off_chain_data/application-go/transact.go new file mode 100644 index 00000000..daaea5b4 --- /dev/null +++ b/off_chain_data/application-go/transact.go @@ -0,0 +1,133 @@ +package main + +import ( + "context" + "fmt" + "math/rand/v2" + "sync" + + atb "offchaindata/contract" + + "github.com/google/uuid" + "github.com/hyperledger/fabric-gateway/pkg/client" + "google.golang.org/grpc" +) + +var owners = []string{"alice", "bob", "charlie"} + +func transact(clientConnection grpc.ClientConnInterface) error { + id, options := newConnectOptions(clientConnection) + gateway, err := client.Connect(id, options...) + if err != nil { + return err + } + defer func() { + gateway.Close() + fmt.Println("Gateway closed.") + }() + + contract := gateway.GetNetwork(channelName).GetContract(chaincodeName) + smartContract := atb.NewAssetTransferBasic(contract) + app := newTransactApp(smartContract) + return app.run() +} + +type transactApp struct { + smartContract *atb.AssetTransferBasic + batchSize int +} + +func newTransactApp(smartContract *atb.AssetTransferBasic) *transactApp { + return &transactApp{smartContract, 10} +} + +func (t *transactApp) run() error { + ctx, cancel := context.WithCancelCause(context.Background()) + defer cancel(nil) + + var wg sync.WaitGroup + + for range t.batchSize { + wg.Add(1) + go func() { + defer wg.Done() + + select { + case <-ctx.Done(): + return + default: + if err := t.transact(); err != nil { + cancel(err) + return + } + } + }() + } + + wg.Wait() + + return context.Cause(ctx) +} + +func (t *transactApp) transact() error { + anAsset, err := newAsset() + if err != nil { + return err + } + + if err := t.smartContract.CreateAsset(anAsset); err != nil { + return err + } + fmt.Println("Created asset", anAsset.ID) + + // Transfer randomly 1 in 2 assets to a new owner. + if rand.N(2) == 0 { + newOwner := differentElement(owners, anAsset.Owner) + oldOwner, err := t.smartContract.TransferAsset(anAsset.ID, newOwner) + if err != nil { + return err + } + fmt.Printf("Transferred asset %s from %s to %s\n", anAsset.ID, oldOwner, newOwner) + } + + // Delete randomly 1 in 4 created assets. + if rand.N(4) == 0 { + if err := t.smartContract.DeleteAsset(anAsset.ID); err != nil { + return err + } + fmt.Println("Deleted asset", anAsset.ID) + } + + return nil +} + +func newAsset() (atb.Asset, error) { + id, err := uuid.NewRandom() + if err != nil { + return atb.Asset{}, err + } + + return atb.Asset{ + ID: id.String(), + Color: randomElement([]string{"red", "green", "blue"}), + Size: uint64(rand.N(10) + 1), + Owner: randomElement(owners), + AppraisedValue: uint64(rand.N(1000) + 1), + }, nil +} + +// Pick a random element from an array. +func randomElement(values []string) string { + return values[rand.N(len(values))] +} + +// Pick a random element from an array, excluding the current value. +func differentElement(values []string, currentValue string) string { + candidateValues := []string{} + for _, v := range values { + if v != currentValue { + candidateValues = append(candidateValues, v) + } + } + return randomElement(candidateValues) +} diff --git a/off_chain_data/application-go/utils.go b/off_chain_data/application-go/utils.go new file mode 100644 index 00000000..d8569278 --- /dev/null +++ b/off_chain_data/application-go/utils.go @@ -0,0 +1,13 @@ +package main + +import ( + "os" +) + +func envOrDefault(key, defaultValue string) string { + result := os.Getenv(key) + if result == "" { + return defaultValue + } + return result +} diff --git a/off_chain_data/application-typescript/src/utils.ts b/off_chain_data/application-typescript/src/utils.ts index e798a201..756b2e3a 100644 --- a/off_chain_data/application-typescript/src/utils.ts +++ b/off_chain_data/application-typescript/src/utils.ts @@ -67,7 +67,7 @@ export function assertDefined(value: T | null | undefined, message: string): */ export function cache(f: () => T): () => T { let value: T | undefined; - return () => { + return (): T => { if (value === undefined) { value = f(); }