Add second batch of pull request rework

- switch to ClientConnInterface
- use command type alias for map of commands
- add error return to command functions and handle in app.go
- inline formatJSON function in getAllAssets.go
- replace most panics with error returns
- remove error wrapping in listen.go and further down the line
- use strconv.ParseUint instead of ParseFloat
- use WithCancelCause in transact.go to grab and propagate error from
  go routine
- unmarshal and return []Asset in atb.GetAllAssets
- switch to rand package
- remove dependency to protobuf reflect package
- switch to sync.OnceValues for caching parser
- fixed typo in events sample connect.go

Signed-off-by: Stanislav Jakuschevskij <stas@two-giants.com>
This commit is contained in:
Stanislav Jakuschevskij 2025-02-10 19:57:51 +01:00
parent fd1a1fc38b
commit f6858cc7e1
No known key found for this signature in database
GPG key ID: 78195D2E6998E2EB
15 changed files with 183 additions and 227 deletions

View file

@ -31,7 +31,7 @@ const (
func newGrpcConnection() *grpc.ClientConn { func newGrpcConnection() *grpc.ClientConn {
certificatePEM, err := os.ReadFile(tlsCertPath) certificatePEM, err := os.ReadFile(tlsCertPath)
if err != nil { if err != nil {
panic(fmt.Errorf("failed to read TLS certifcate file: %w", err)) panic(fmt.Errorf("failed to read TLS certificate file: %w", err))
} }
certificate, err := identity.CertificateFromPEM(certificatePEM) certificate, err := identity.CertificateFromPEM(certificatePEM)

View file

@ -1,9 +1,3 @@
/*
* Copyright 2024 IBM All Rights Reserved.
*
* SPDX-License-Identifier: Apache-2.0
*/
package main package main
import ( import (
@ -15,7 +9,9 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
) )
var allCommands = map[string]func(*grpc.ClientConn){ type command func(grpc.ClientConnInterface) error
var allCommands = map[string]command{
"getAllAssets": getAllAssets, "getAllAssets": getAllAssets,
"transact": transact, "transact": transact,
"listen": listen, "listen": listen,
@ -41,7 +37,10 @@ func main() {
for _, name := range commands { for _, name := range commands {
command := allCommands[name] command := allCommands[name]
command(client)
if err := command(client); err != nil {
panic(err)
}
} }
} }

View file

@ -1,9 +1,3 @@
/*
* Copyright 2024 IBM All Rights Reserved.
*
* SPDX-License-Identifier: Apache-2.0
*/
package main package main
import ( import (
@ -69,7 +63,7 @@ func newGrpcConnection() *grpc.ClientConn {
return connection return connection
} }
func newConnectOptions(clientConnection *grpc.ClientConn) (identity.Identity, []client.ConnectOption) { func newConnectOptions(clientConnection grpc.ClientConnInterface) (identity.Identity, []client.ConnectOption) {
return newIdentity(), []client.ConnectOption{ return newIdentity(), []client.ConnectOption{
client.WithSign(newSign()), client.WithSign(newSign()),
client.WithHash(hash.SHA256), client.WithHash(hash.SHA256),

View file

@ -1,12 +1,7 @@
/*
* Copyright 2024 IBM All Rights Reserved.
*
* SPDX-License-Identifier: Apache-2.0
*/
package contract package contract
import ( import (
"fmt" "encoding/json"
"strconv" "strconv"
"github.com/hyperledger/fabric-gateway/pkg/client" "github.com/hyperledger/fabric-gateway/pkg/client"
@ -30,7 +25,7 @@ func (atb *AssetTransferBasic) CreateAsset(anAsset Asset) error {
anAsset.Owner, anAsset.Owner,
strconv.FormatUint(anAsset.AppraisedValue, 10), strconv.FormatUint(anAsset.AppraisedValue, 10),
)); err != nil { )); err != nil {
return fmt.Errorf("in CreateAsset: %w", err) return err
} }
return nil return nil
} }
@ -44,7 +39,7 @@ func (atb *AssetTransferBasic) TransferAsset(id, newOwner string) (string, error
), ),
) )
if err != nil { if err != nil {
return "", fmt.Errorf("in TransferAsset: %w", err) return "", err
} }
return string(result), nil return string(result), nil
@ -57,15 +52,25 @@ func (atb *AssetTransferBasic) DeleteAsset(id string) error {
id, id,
), ),
); err != nil { ); err != nil {
return fmt.Errorf("in DeleteAsset: %w", err) return err
} }
return nil return nil
} }
func (atb *AssetTransferBasic) GetAllAssets() ([]byte, error) { func (atb *AssetTransferBasic) GetAllAssets() ([]Asset, error) {
result, err := atb.contract.Evaluate("GetAllAssets") assetsRaw, err := atb.contract.Evaluate("GetAllAssets")
if err != nil { if err != nil {
return nil, fmt.Errorf("in GetAllAssets: %w", err) return nil, err
} }
return result, nil
if len(assetsRaw) == 0 {
return []Asset{}, nil
}
var assets []Asset
if err := json.Unmarshal(assetsRaw, &assets); err != nil {
return nil, err
}
return assets, nil
} }

View file

@ -1,8 +1,3 @@
/*
* Copyright 2024 IBM All Rights Reserved.
*
* SPDX-License-Identifier: Apache-2.0
*/
package contract package contract
type Asset struct { type Asset struct {

View file

@ -1,13 +1,6 @@
/*
* Copyright 2024 IBM All Rights Reserved.
*
* SPDX-License-Identifier: Apache-2.0
*/
package main package main
import ( import (
"bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
atb "offchaindata/contract" atb "offchaindata/contract"
@ -16,11 +9,11 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
) )
func getAllAssets(clientConnection *grpc.ClientConn) { func getAllAssets(clientConnection grpc.ClientConnInterface) error {
id, options := newConnectOptions(clientConnection) id, options := newConnectOptions(clientConnection)
gateway, err := client.Connect(id, options...) gateway, err := client.Connect(id, options...)
if err != nil { if err != nil {
panic(err) return err
} }
defer gateway.Close() defer gateway.Close()
@ -28,21 +21,15 @@ func getAllAssets(clientConnection *grpc.ClientConn) {
smartContract := atb.NewAssetTransferBasic(contract) smartContract := atb.NewAssetTransferBasic(contract)
assets, err := smartContract.GetAllAssets() assets, err := smartContract.GetAllAssets()
if err != nil { if err != nil {
panic(err) return err
} }
if len(assets) == 0 { assetsJSONformatted, err := json.MarshalIndent(assets, "", " ")
fmt.Println("no assets") if err != nil {
return return err
} }
fmt.Println(formatJSON(assets)) fmt.Println(assetsJSONformatted)
}
func formatJSON(data []byte) string { return nil
var result bytes.Buffer
if err := json.Indent(&result, data, "", " "); err != nil {
panic(fmt.Errorf("failed to parse JSON: %w", err))
}
return result.String()
} }

View file

@ -14,11 +14,11 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
) )
func listen(clientConnection *grpc.ClientConn) { func listen(clientConnection grpc.ClientConnInterface) error {
id, options := newConnectOptions(clientConnection) id, options := newConnectOptions(clientConnection)
gateway, err := client.Connect(id, options...) gateway, err := client.Connect(id, options...)
if err != nil { if err != nil {
panic(err) return err
} }
defer func() { defer func() {
gateway.Close() gateway.Close()
@ -28,7 +28,7 @@ func listen(clientConnection *grpc.ClientConn) {
checkpointFile := envOrDefault("CHECKPOINT_FILE", "checkpoint.json") checkpointFile := envOrDefault("CHECKPOINT_FILE", "checkpoint.json")
checkpointer, err := client.NewFileCheckpointer(checkpointFile) checkpointer, err := client.NewFileCheckpointer(checkpointFile)
if err != nil { if err != nil {
panic(err) return err
} }
defer func() { defer func() {
checkpointer.Close() checkpointer.Close()
@ -57,7 +57,7 @@ func listen(clientConnection *grpc.ClientConn) {
client.WithCheckpoint(checkpointer), client.WithCheckpoint(checkpointer),
) )
if err != nil { if err != nil {
panic(err) return err
} }
for blockProto := range blocks { for blockProto := range blocks {
@ -69,12 +69,12 @@ func listen(clientConnection *grpc.ClientConn) {
} }
if err := aBlockProcessor.process(); err != nil { if err := aBlockProcessor.process(); err != nil {
fmt.Println("\033[31m[ERROR]\033[0m", err) return err
return
} }
} }
fmt.Println("\nShutting down listener gracefully...") fmt.Println("\nShutting down listener gracefully...")
return nil
} }
type blockProcessor struct { type blockProcessor struct {
@ -85,39 +85,37 @@ type blockProcessor struct {
} }
func (b *blockProcessor) process() error { func (b *blockProcessor) process() error {
funcName := "Process"
fmt.Println("\nReceived block", b.parsedBlock.Number()) fmt.Println("\nReceived block", b.parsedBlock.Number())
validTransactions, err := b.validTransactions() validTransactions, err := b.validTransactions()
if err != nil { if err != nil {
return fmt.Errorf("in %s: %w", funcName, err) return err
} }
for _, validTransaction := range validTransactions { for _, validTransaction := range validTransactions {
aTransaction := transactionProcessor{ txProcessor := transactionProcessor{
b.parsedBlock.Number(), b.parsedBlock.Number(),
validTransaction, validTransaction,
// TODO use pointer to parent and get blockNumber, store and channelName from parent // TODO use reference to parent and get blockNumber, store and channelName from parent
b.writeToStore, b.writeToStore,
b.channelName, b.channelName,
} }
if err := aTransaction.process(); err != nil { if err := txProcessor.process(); err != nil {
return fmt.Errorf("in %s: %w", funcName, err) return err
} }
channelHeader, err := validTransaction.ChannelHeader() channelHeader, err := validTransaction.ChannelHeader()
if err != nil { if err != nil {
return fmt.Errorf("in %s: %w", funcName, err) return err
} }
transactionID := channelHeader.GetTxId() transactionID := channelHeader.GetTxId()
if err := b.checkpointer.CheckpointTransaction(b.parsedBlock.Number(), transactionID); err != nil { if err := b.checkpointer.CheckpointTransaction(b.parsedBlock.Number(), transactionID); err != nil {
return fmt.Errorf("in %s: %w", funcName, err) return err
} }
} }
if err := b.checkpointer.CheckpointBlock(b.parsedBlock.Number()); err != nil { if err := b.checkpointer.CheckpointBlock(b.parsedBlock.Number()); err != nil {
return fmt.Errorf("in %s: %w", funcName, err) return err
} }
return nil return nil
@ -127,7 +125,7 @@ func (b *blockProcessor) validTransactions() ([]*parser.Transaction, error) {
result := []*parser.Transaction{} result := []*parser.Transaction{}
newTransactions, err := b.getNewTransactions() newTransactions, err := b.getNewTransactions()
if err != nil { if err != nil {
return nil, fmt.Errorf("in validTransactions: %w", err) return nil, err
} }
for _, transaction := range newTransactions { for _, transaction := range newTransactions {
@ -139,11 +137,9 @@ func (b *blockProcessor) validTransactions() ([]*parser.Transaction, error) {
} }
func (b *blockProcessor) getNewTransactions() ([]*parser.Transaction, error) { func (b *blockProcessor) getNewTransactions() ([]*parser.Transaction, error) {
funcName := "getNewTransactions"
transactions, err := b.parsedBlock.Transactions() transactions, err := b.parsedBlock.Transactions()
if err != nil { if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err) return nil, err
} }
lastTransactionID := b.checkpointer.TransactionID() lastTransactionID := b.checkpointer.TransactionID()
@ -155,24 +151,22 @@ func (b *blockProcessor) getNewTransactions() ([]*parser.Transaction, error) {
// Ignore transactions up to the last processed transaction ID // Ignore transactions up to the last processed transaction ID
lastProcessedIndex, err := b.findLastProcessedIndex() lastProcessedIndex, err := b.findLastProcessedIndex()
if err != nil { if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err) return nil, err
} }
return transactions[lastProcessedIndex+1:], nil return transactions[lastProcessedIndex+1:], nil
} }
func (b *blockProcessor) findLastProcessedIndex() (int, error) { func (b *blockProcessor) findLastProcessedIndex() (int, error) {
funcName := "findLastProcessedIndex"
transactions, err := b.parsedBlock.Transactions() transactions, err := b.parsedBlock.Transactions()
if err != nil { if err != nil {
return 0, fmt.Errorf("in %s: %w", funcName, err) return 0, err
} }
blockTransactionIDs := []string{} blockTransactionIDs := []string{}
for _, transaction := range transactions { for _, transaction := range transactions {
channelHeader, err := transaction.ChannelHeader() channelHeader, err := transaction.ChannelHeader()
if err != nil { if err != nil {
return 0, fmt.Errorf("in %s: %w", funcName, err) return 0, err
} }
blockTransactionIDs = append(blockTransactionIDs, channelHeader.GetTxId()) blockTransactionIDs = append(blockTransactionIDs, channelHeader.GetTxId())
} }
@ -204,17 +198,15 @@ type transactionProcessor struct {
} }
func (t *transactionProcessor) process() error { func (t *transactionProcessor) process() error {
funcName := "process"
channelHeader, err := t.transaction.ChannelHeader() channelHeader, err := t.transaction.ChannelHeader()
if err != nil { if err != nil {
return fmt.Errorf("in %s: %w", funcName, err) return err
} }
transactionID := channelHeader.GetTxId() transactionID := channelHeader.GetTxId()
writes, err := t.writes() writes, err := t.writes()
if err != nil { if err != nil {
return fmt.Errorf("in %s: %w", funcName, err) return err
} }
if len(writes) == 0 { if len(writes) == 0 {
@ -229,25 +221,24 @@ func (t *transactionProcessor) process() error {
TransactionID: transactionID, TransactionID: transactionID,
Writes: writes, Writes: writes,
}); err != nil { }); err != nil {
return fmt.Errorf("in %s: %w", funcName, err) return err
} }
return nil return nil
} }
func (t *transactionProcessor) writes() ([]write, error) { func (t *transactionProcessor) writes() ([]write, error) {
funcName := "writes"
// TODO this entire code should live in the parser and just return the kvWrite which // TODO this entire code should live in the parser and just return the kvWrite which
// we then map to write and return // we then map to write and return
channelHeader, err := t.transaction.ChannelHeader() channelHeader, err := t.transaction.ChannelHeader()
if err != nil { if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err) return nil, err
} }
t.channelName = channelHeader.GetChannelId() t.channelName = channelHeader.GetChannelId()
nsReadWriteSets, err := t.transaction.NamespaceReadWriteSets() nsReadWriteSets, err := t.transaction.NamespaceReadWriteSets()
if err != nil { if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err) return nil, err
} }
nonSystemCCReadWriteSets := []*parser.NamespaceReadWriteSet{} nonSystemCCReadWriteSets := []*parser.NamespaceReadWriteSet{}
@ -263,7 +254,7 @@ func (t *transactionProcessor) writes() ([]write, error) {
kvReadWriteSet, err := readWriteSet.ReadWriteSet() kvReadWriteSet, err := readWriteSet.ReadWriteSet()
if err != nil { if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err) return nil, err
} }
for _, kvWrite := range kvReadWriteSet.GetWrites() { for _, kvWrite := range kvReadWriteSet.GetWrites() {

View file

@ -1,7 +1,7 @@
package parser package parser
import ( import (
"fmt" "sync"
"github.com/hyperledger/fabric-protos-go-apiv2/common" "github.com/hyperledger/fabric-protos-go-apiv2/common"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
@ -9,11 +9,13 @@ import (
type Block struct { type Block struct {
block *common.Block block *common.Block
cachedTransactions []*Transaction transactions func() ([]*Transaction, error)
} }
func ParseBlock(block *common.Block) *Block { func ParseBlock(block *common.Block) *Block {
return &Block{block, nil} result := &Block{block, nil}
result.transactions = sync.OnceValues(result.unmarshalTransactions)
return result
} }
func (b *Block) Number() uint64 { func (b *Block) Number() uint64 {
@ -21,37 +23,34 @@ func (b *Block) Number() uint64 {
} }
func (b *Block) Transactions() ([]*Transaction, error) { func (b *Block) Transactions() ([]*Transaction, error) {
if b.cachedTransactions != nil { return b.transactions()
return b.cachedTransactions, nil
} }
funcName := "Transactions" func (b *Block) unmarshalTransactions() ([]*Transaction, error) {
envelopes, err := b.unmarshalEnvelopesFromBlockData() envelopes, err := b.unmarshalEnvelopes()
if err != nil { if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err) return nil, err
} }
commonPayloads, err := b.unmarshalPayloadsFrom(envelopes) commonPayloads, err := b.unmarshalPayloadsFrom(envelopes)
if err != nil { if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err) return nil, err
} }
payloads, err := b.parse(commonPayloads) payloads, err := b.parse(commonPayloads)
if err != nil { if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err) return nil, err
} }
b.cachedTransactions = b.createTransactionsFrom(payloads) return b.createTransactionsFrom(payloads), nil
return b.cachedTransactions, nil
} }
func (b *Block) unmarshalEnvelopesFromBlockData() ([]*common.Envelope, error) { func (b *Block) unmarshalEnvelopes() ([]*common.Envelope, error) {
result := []*common.Envelope{} result := []*common.Envelope{}
for _, blockData := range b.block.GetData().GetData() { for _, blockData := range b.block.GetData().GetData() {
envelope := &common.Envelope{} envelope := &common.Envelope{}
if err := proto.Unmarshal(blockData, envelope); err != nil { if err := proto.Unmarshal(blockData, envelope); err != nil {
return nil, fmt.Errorf("in unmarshalEnvelopesFromBlockData: %w", err) return nil, err
} }
result = append(result, envelope) result = append(result, envelope)
} }
@ -63,7 +62,7 @@ func (*Block) unmarshalPayloadsFrom(envelopes []*common.Envelope) ([]*common.Pay
for _, envelope := range envelopes { for _, envelope := range envelopes {
commonPayload := &common.Payload{} commonPayload := &common.Payload{}
if err := proto.Unmarshal(envelope.GetPayload(), commonPayload); err != nil { if err := proto.Unmarshal(envelope.GetPayload(), commonPayload); err != nil {
return nil, fmt.Errorf("in unmarshalPayloadsFrom: %w", err) return nil, err
} }
result = append(result, commonPayload) result = append(result, commonPayload)
} }
@ -71,8 +70,6 @@ func (*Block) unmarshalPayloadsFrom(envelopes []*common.Envelope) ([]*common.Pay
} }
func (b *Block) parse(commonPayloads []*common.Payload) ([]*payload, error) { func (b *Block) parse(commonPayloads []*common.Payload) ([]*payload, error) {
funcName := "parse"
validationCodes := b.block.GetMetadata().GetMetadata()[common.BlockMetadataIndex_TRANSACTIONS_FILTER] validationCodes := b.block.GetMetadata().GetMetadata()[common.BlockMetadataIndex_TRANSACTIONS_FILTER]
result := []*payload{} result := []*payload{}
@ -82,7 +79,7 @@ func (b *Block) parse(commonPayloads []*common.Payload) ([]*payload, error) {
payload := parsePayload(commonPayload, int32(statusCode)) payload := parsePayload(commonPayload, int32(statusCode))
is, err := payload.isEndorserTransaction() is, err := payload.isEndorserTransaction()
if err != nil { if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err) return nil, err
} }
if is { if is {
result = append(result, payload) result = append(result, payload)

View file

@ -10,7 +10,6 @@ import (
"github.com/hyperledger/fabric-protos-go-apiv2/ledger/rwset/kvrwset" "github.com/hyperledger/fabric-protos-go-apiv2/ledger/rwset/kvrwset"
"github.com/hyperledger/fabric-protos-go-apiv2/peer" "github.com/hyperledger/fabric-protos-go-apiv2/peer"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
) )
func Test_GetReadWriteSetsFromEndorserTransaction(t *testing.T) { func Test_GetReadWriteSetsFromEndorserTransaction(t *testing.T) {
@ -128,7 +127,7 @@ func nsReadWriteSetFake() (*rwset.NsReadWriteSet, string, atb.Asset) {
return result, expectedNamespace, expectedAsset return result, expectedNamespace, expectedAsset
} }
func protoMarshalOrPanic(v protoreflect.ProtoMessage) []byte { func protoMarshalOrPanic(v proto.Message) []byte {
result, err := proto.Marshal(v) result, err := proto.Marshal(v)
if err != nil { if err != nil {
panic(err) panic(err)

View file

@ -1,7 +1,7 @@
package parser package parser
import ( import (
"fmt" "sync"
"github.com/hyperledger/fabric-protos-go-apiv2/ledger/rwset" "github.com/hyperledger/fabric-protos-go-apiv2/ledger/rwset"
"github.com/hyperledger/fabric-protos-go-apiv2/peer" "github.com/hyperledger/fabric-protos-go-apiv2/peer"
@ -10,48 +10,42 @@ import (
type endorserTransaction struct { type endorserTransaction struct {
transaction *peer.Transaction transaction *peer.Transaction
cachedReadWriteSets []*readWriteSet readWriteSets func() ([]*readWriteSet, error)
} }
func parseEndorserTransaction(transaction *peer.Transaction) *endorserTransaction { func parseEndorserTransaction(transaction *peer.Transaction) *endorserTransaction {
return &endorserTransaction{transaction, nil} result := &endorserTransaction{transaction, nil}
} result.readWriteSets = sync.OnceValues(result.unmarshalReadWriteSets)
return result
func (p *endorserTransaction) readWriteSets() ([]*readWriteSet, error) {
funcName := "readWriteSets"
if p.cachedReadWriteSets != nil {
return p.cachedReadWriteSets, nil
} }
func (p *endorserTransaction) unmarshalReadWriteSets() ([]*readWriteSet, error) {
chaincodeActionPayloads, err := p.unmarshalChaincodeActionPayloads() chaincodeActionPayloads, err := p.unmarshalChaincodeActionPayloads()
if err != nil { if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err) return nil, err
} }
chaincodeEndorsedActions, err := p.extractChaincodeEndorsedActionsFrom(chaincodeActionPayloads) chaincodeEndorsedActions, err := p.extractChaincodeEndorsedActionsFrom(chaincodeActionPayloads)
if err != nil { if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err) return nil, err
} }
proposalResponsePayloads, err := p.unmarshalProposalResponsePayloadsFrom(chaincodeEndorsedActions) proposalResponsePayloads, err := p.unmarshalProposalResponsePayloadsFrom(chaincodeEndorsedActions)
if err != nil { if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err) return nil, err
} }
chaincodeActions, err := p.unmarshalChaincodeActionsFrom(proposalResponsePayloads) chaincodeActions, err := p.unmarshalChaincodeActionsFrom(proposalResponsePayloads)
if err != nil { if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err) return nil, err
} }
txReadWriteSets, err := p.unmarshalTxReadWriteSetsFrom(chaincodeActions) txReadWriteSets, err := p.unmarshalTxReadWriteSetsFrom(chaincodeActions)
if err != nil { if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err) return nil, err
} }
p.cachedReadWriteSets = p.parseReadWriteSets(txReadWriteSets) return p.parseReadWriteSets(txReadWriteSets), nil
return p.cachedReadWriteSets, nil
} }
func (p *endorserTransaction) unmarshalChaincodeActionPayloads() ([]*peer.ChaincodeActionPayload, error) { func (p *endorserTransaction) unmarshalChaincodeActionPayloads() ([]*peer.ChaincodeActionPayload, error) {
@ -59,7 +53,7 @@ func (p *endorserTransaction) unmarshalChaincodeActionPayloads() ([]*peer.Chainc
for _, transactionAction := range p.transaction.GetActions() { for _, transactionAction := range p.transaction.GetActions() {
chaincodeActionPayload := &peer.ChaincodeActionPayload{} chaincodeActionPayload := &peer.ChaincodeActionPayload{}
if err := proto.Unmarshal(transactionAction.GetPayload(), chaincodeActionPayload); err != nil { if err := proto.Unmarshal(transactionAction.GetPayload(), chaincodeActionPayload); err != nil {
return nil, fmt.Errorf("in unmarshalChaincodeActionPayloads: %w", err) return nil, err
} }
result = append(result, chaincodeActionPayload) result = append(result, chaincodeActionPayload)
@ -80,7 +74,7 @@ func (*endorserTransaction) unmarshalProposalResponsePayloadsFrom(chaincodeEndor
for _, endorsedAction := range chaincodeEndorsedActions { for _, endorsedAction := range chaincodeEndorsedActions {
proposalResponsePayload := &peer.ProposalResponsePayload{} proposalResponsePayload := &peer.ProposalResponsePayload{}
if err := proto.Unmarshal(endorsedAction.GetProposalResponsePayload(), proposalResponsePayload); err != nil { if err := proto.Unmarshal(endorsedAction.GetProposalResponsePayload(), proposalResponsePayload); err != nil {
return nil, fmt.Errorf("in unmarshalProposalResponsePayloadsFrom: %w", err) return nil, err
} }
result = append(result, proposalResponsePayload) result = append(result, proposalResponsePayload)
} }
@ -92,7 +86,7 @@ func (*endorserTransaction) unmarshalChaincodeActionsFrom(proposalResponsePayloa
for _, proposalResponsePayload := range proposalResponsePayloads { for _, proposalResponsePayload := range proposalResponsePayloads {
chaincodeAction := &peer.ChaincodeAction{} chaincodeAction := &peer.ChaincodeAction{}
if err := proto.Unmarshal(proposalResponsePayload.GetExtension(), chaincodeAction); err != nil { if err := proto.Unmarshal(proposalResponsePayload.GetExtension(), chaincodeAction); err != nil {
return nil, fmt.Errorf("in unmarshalChaincodeActionsFrom: %w", err) return nil, err
} }
result = append(result, chaincodeAction) result = append(result, chaincodeAction)
} }
@ -104,7 +98,7 @@ func (*endorserTransaction) unmarshalTxReadWriteSetsFrom(chaincodeActions []*pee
for _, chaincodeAction := range chaincodeActions { for _, chaincodeAction := range chaincodeActions {
txReadWriteSet := &rwset.TxReadWriteSet{} txReadWriteSet := &rwset.TxReadWriteSet{}
if err := proto.Unmarshal(chaincodeAction.GetResults(), txReadWriteSet); err != nil { if err := proto.Unmarshal(chaincodeAction.GetResults(), txReadWriteSet); err != nil {
return nil, fmt.Errorf("in unmarshalTxReadWriteSetsFrom: %w", err) return nil, err
} }
result = append(result, txReadWriteSet) result = append(result, txReadWriteSet)
} }

View file

@ -1,7 +1,7 @@
package parser package parser
import ( import (
"fmt" "sync"
"github.com/hyperledger/fabric-protos-go-apiv2/ledger/rwset" "github.com/hyperledger/fabric-protos-go-apiv2/ledger/rwset"
"github.com/hyperledger/fabric-protos-go-apiv2/ledger/rwset/kvrwset" "github.com/hyperledger/fabric-protos-go-apiv2/ledger/rwset/kvrwset"
@ -10,11 +10,13 @@ import (
type NamespaceReadWriteSet struct { type NamespaceReadWriteSet struct {
nsReadWriteSet *rwset.NsReadWriteSet nsReadWriteSet *rwset.NsReadWriteSet
cachedReadWriteSet *kvrwset.KVRWSet readWriteSet func() (*kvrwset.KVRWSet, error)
} }
func parseNamespaceReadWriteSet(nsRwSet *rwset.NsReadWriteSet) *NamespaceReadWriteSet { func parseNamespaceReadWriteSet(nsRwSet *rwset.NsReadWriteSet) *NamespaceReadWriteSet {
return &NamespaceReadWriteSet{nsRwSet, nil} result := &NamespaceReadWriteSet{nsRwSet, nil}
result.readWriteSet = sync.OnceValues(result.unmarshalReadWriteSet)
return result
} }
func (p *NamespaceReadWriteSet) Namespace() string { func (p *NamespaceReadWriteSet) Namespace() string {
@ -22,14 +24,14 @@ func (p *NamespaceReadWriteSet) Namespace() string {
} }
func (p *NamespaceReadWriteSet) ReadWriteSet() (*kvrwset.KVRWSet, error) { func (p *NamespaceReadWriteSet) ReadWriteSet() (*kvrwset.KVRWSet, error) {
if p.cachedReadWriteSet != nil { return p.readWriteSet()
return p.cachedReadWriteSet, nil
} }
p.cachedReadWriteSet = &kvrwset.KVRWSet{} func (p *NamespaceReadWriteSet) unmarshalReadWriteSet() (*kvrwset.KVRWSet, error) {
if err := proto.Unmarshal(p.nsReadWriteSet.GetRwset(), p.cachedReadWriteSet); err != nil { result := &kvrwset.KVRWSet{}
return nil, fmt.Errorf("in ReadWriteSet: %w", err) if err := proto.Unmarshal(p.nsReadWriteSet.GetRwset(), result); err != nil {
return nil, err
} }
return p.cachedReadWriteSet, nil return result, nil
} }

View file

@ -2,6 +2,7 @@ package parser
import ( import (
"fmt" "fmt"
"sync"
"github.com/hyperledger/fabric-protos-go-apiv2/common" "github.com/hyperledger/fabric-protos-go-apiv2/common"
"github.com/hyperledger/fabric-protos-go-apiv2/peer" "github.com/hyperledger/fabric-protos-go-apiv2/peer"
@ -11,44 +12,40 @@ import (
type payload struct { type payload struct {
commonPayload *common.Payload commonPayload *common.Payload
statusCode int32 statusCode int32
cachedChannelHeader *common.ChannelHeader channelHeader func() (*common.ChannelHeader, error)
} }
func parsePayload(commonPayload *common.Payload, statusCode int32) *payload { func parsePayload(commonPayload *common.Payload, statusCode int32) *payload {
return &payload{commonPayload, statusCode, nil} result := &payload{commonPayload, statusCode, nil}
result.channelHeader = sync.OnceValues(result.unmarshalChannelHeader)
return result
} }
func (p *payload) channelHeader() (*common.ChannelHeader, error) { func (p *payload) unmarshalChannelHeader() (*common.ChannelHeader, error) {
if p.cachedChannelHeader != nil { result := &common.ChannelHeader{}
return p.cachedChannelHeader, nil if err := proto.Unmarshal(p.commonPayload.GetHeader().GetChannelHeader(), result); err != nil {
return nil, err
} }
p.cachedChannelHeader = &common.ChannelHeader{} return result, nil
if err := proto.Unmarshal(p.commonPayload.GetHeader().GetChannelHeader(), p.cachedChannelHeader); err != nil {
return nil, fmt.Errorf("in channelHeader: %w", err)
}
return p.cachedChannelHeader, nil
} }
func (p *payload) endorserTransaction() (*endorserTransaction, error) { func (p *payload) endorserTransaction() (*endorserTransaction, error) {
funcName := "endorserTransaction"
is, err := p.isEndorserTransaction() is, err := p.isEndorserTransaction()
if err != nil { if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err) return nil, err
} }
if !is { if !is {
channelHeader, err := p.channelHeader() channelHeader, err := p.channelHeader()
if err != nil { if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err) return nil, err
} }
return nil, fmt.Errorf("unexpected payload type: %d", channelHeader.GetType()) return nil, fmt.Errorf("unexpected payload type: %d", channelHeader.GetType())
} }
result := &peer.Transaction{} result := &peer.Transaction{}
if err := proto.Unmarshal(p.commonPayload.GetData(), result); err != nil { if err := proto.Unmarshal(p.commonPayload.GetData(), result); err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err) return nil, err
} }
return parseEndorserTransaction(result), nil return parseEndorserTransaction(result), nil
@ -57,7 +54,7 @@ func (p *payload) endorserTransaction() (*endorserTransaction, error) {
func (p *payload) isEndorserTransaction() (bool, error) { func (p *payload) isEndorserTransaction() (bool, error) {
channelHeader, err := p.channelHeader() channelHeader, err := p.channelHeader()
if err != nil { if err != nil {
return false, fmt.Errorf("in isEndorserTransaction: %w", err) return false, err
} }
return channelHeader.GetType() == int32(common.HeaderType_ENDORSER_TRANSACTION), nil return channelHeader.GetType() == int32(common.HeaderType_ENDORSER_TRANSACTION), nil

View file

@ -1,8 +1,6 @@
package parser package parser
import ( import (
"fmt"
"github.com/hyperledger/fabric-protos-go-apiv2/common" "github.com/hyperledger/fabric-protos-go-apiv2/common"
) )
@ -19,16 +17,14 @@ func (t *Transaction) ChannelHeader() (*common.ChannelHeader, error) {
} }
func (t *Transaction) NamespaceReadWriteSets() ([]*NamespaceReadWriteSet, error) { func (t *Transaction) NamespaceReadWriteSets() ([]*NamespaceReadWriteSet, error) {
funcName := "NamespaceReadWriteSets"
endorserTransaction, err := t.payload.endorserTransaction() endorserTransaction, err := t.payload.endorserTransaction()
if err != nil { if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err) return nil, err
} }
txReadWriteSets, err := endorserTransaction.readWriteSets() txReadWriteSets, err := endorserTransaction.readWriteSets()
if err != nil { if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err) return nil, err
} }
result := []*NamespaceReadWriteSet{} result := []*NamespaceReadWriteSet{}

View file

@ -4,7 +4,6 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"math"
"os" "os"
"strconv" "strconv"
"strings" "strings"
@ -41,17 +40,15 @@ type write struct {
// Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance. // Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance.
// This implementation just writes to a file. // This implementation just writes to a file.
func applyWritesToOffChainStore(data ledgerUpdate) error { func applyWritesToOffChainStore(data ledgerUpdate) error {
funcName := "applyWritesToOffChainStore"
if err := simulateFailureIfRequired(); err != nil { if err := simulateFailureIfRequired(); err != nil {
return fmt.Errorf("in %s: %w", funcName, err) return err
} }
writes := []string{} writes := []string{}
for _, write := range data.Writes { for _, write := range data.Writes {
marshaled, err := json.Marshal(write) marshaled, err := json.Marshal(write)
if err != nil { if err != nil {
return fmt.Errorf("in %s: %w", funcName, err) return err
} }
writes = append(writes, string(marshaled)) writes = append(writes, string(marshaled))
@ -59,16 +56,16 @@ func applyWritesToOffChainStore(data ledgerUpdate) error {
f, err := os.OpenFile(storeFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) f, err := os.OpenFile(storeFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil { if err != nil {
return fmt.Errorf("in %s: %w", funcName, err) return err
} }
if _, err := f.Write([]byte(strings.Join(writes, "\n") + "\n")); err != nil { if _, err := f.Write([]byte(strings.Join(writes, "\n") + "\n")); err != nil {
f.Close() f.Close()
return fmt.Errorf("in %s: %w", funcName, err) return err
} }
if err := f.Close(); err != nil { if err := f.Close(); err != nil {
return fmt.Errorf("in %s: %w", funcName, err) return err
} }
return nil return nil
@ -87,13 +84,8 @@ func simulateFailureIfRequired() error {
func getSimulatedFailureCount() uint { func getSimulatedFailureCount() uint {
valueAsString := envOrDefault("SIMULATED_FAILURE_COUNT", "0") valueAsString := envOrDefault("SIMULATED_FAILURE_COUNT", "0")
valueAsFloat, err := strconv.ParseFloat(valueAsString, 64) result, err := strconv.ParseUint(valueAsString, 10, 0)
if err != nil { if err != nil {
panic(err)
}
result := math.Floor(valueAsFloat)
if valueAsFloat < 0 {
panic(fmt.Errorf("invalid SIMULATED_FAILURE_COUNT value: %s", valueAsString)) panic(fmt.Errorf("invalid SIMULATED_FAILURE_COUNT value: %s", valueAsString))
} }

View file

@ -1,9 +1,9 @@
package main package main
import ( import (
"crypto/rand" "context"
"fmt" "fmt"
"math/big" "math/rand/v2"
"sync" "sync"
atb "offchaindata/contract" atb "offchaindata/contract"
@ -15,11 +15,11 @@ import (
var owners = []string{"alice", "bob", "charlie"} var owners = []string{"alice", "bob", "charlie"}
func transact(clientConnection *grpc.ClientConn) { func transact(clientConnection grpc.ClientConnInterface) error {
id, options := newConnectOptions(clientConnection) id, options := newConnectOptions(clientConnection)
gateway, err := client.Connect(id, options...) gateway, err := client.Connect(id, options...)
if err != nil { if err != nil {
panic((err)) return err
} }
defer func() { defer func() {
gateway.Close() gateway.Close()
@ -30,7 +30,11 @@ func transact(clientConnection *grpc.ClientConn) {
smartContract := atb.NewAssetTransferBasic(contract) smartContract := atb.NewAssetTransferBasic(contract)
app := newTransactApp(smartContract) app := newTransactApp(smartContract)
app.run() if err := app.run(); err != nil {
return err
}
return nil
} }
type transactApp struct { type transactApp struct {
@ -42,90 +46,94 @@ func newTransactApp(smartContract *atb.AssetTransferBasic) *transactApp {
return &transactApp{smartContract, 10} return &transactApp{smartContract, 10}
} }
func (t *transactApp) run() { func (t *transactApp) run() error {
ctx, cancel := context.WithCancelCause(context.Background())
defer cancel(nil)
var wg sync.WaitGroup var wg sync.WaitGroup
for i := 0; i < t.batchSize; i++ { for range t.batchSize {
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
if err := t.transact(); err != nil { select {
fmt.Println("\033[31m[ERROR]\033[0m", err) case <-ctx.Done():
return return
default:
if err := t.transact(); err != nil {
cancel(err)
return
}
} }
}() }()
} }
wg.Wait() wg.Wait()
if err := context.Cause(ctx); err != nil {
return err
}
return nil
} }
func (t *transactApp) transact() error { func (t *transactApp) transact() error {
funcName := "transact" anAsset, err := newAsset()
anAsset := newAsset()
err := t.smartContract.CreateAsset(anAsset)
if err != nil { if err != nil {
return fmt.Errorf("in %s: %w", funcName, err) return err
}
if err := t.smartContract.CreateAsset(anAsset); err != nil {
return err
} }
fmt.Println("Created asset", anAsset.ID) fmt.Println("Created asset", anAsset.ID)
// Transfer randomly 1 in 2 assets to a new owner. // Transfer randomly 1 in 2 assets to a new owner.
if randomInt(2) == 0 { if rand.N(2) == 0 {
newOwner := differentElement(owners, anAsset.Owner) newOwner := differentElement(owners, anAsset.Owner)
oldOwner, err := t.smartContract.TransferAsset(anAsset.ID, newOwner) oldOwner, err := t.smartContract.TransferAsset(anAsset.ID, newOwner)
if err != nil { if err != nil {
return fmt.Errorf("in %s: %w", funcName, err) return err
} }
fmt.Printf("Transferred asset %s from %s to %s\n", anAsset.ID, oldOwner, newOwner) fmt.Printf("Transferred asset %s from %s to %s\n", anAsset.ID, oldOwner, newOwner)
} }
// Delete randomly 1 in 4 created assets. // Delete randomly 1 in 4 created assets.
if randomInt(4) == 0 { if rand.N(4) == 0 {
err := t.smartContract.DeleteAsset(anAsset.ID) if err := t.smartContract.DeleteAsset(anAsset.ID); err != nil {
if err != nil { return err
return fmt.Errorf("in %s: %w", funcName, err)
} }
fmt.Println("Deleted asset", anAsset.ID) fmt.Println("Deleted asset", anAsset.ID)
} }
return nil return nil
} }
func newAsset() atb.Asset { func newAsset() (atb.Asset, error) {
id, err := uuid.NewRandom() id, err := uuid.NewRandom()
if err != nil { if err != nil {
panic(err) return atb.Asset{}, err
} }
return atb.Asset{ return atb.Asset{
ID: id.String(), ID: id.String(),
Color: randomElement([]string{"red", "green", "blue"}), Color: randomElement([]string{"red", "green", "blue"}),
Size: uint64(randomInt(10) + 1), Size: uint64(rand.N(10) + 1),
Owner: randomElement(owners), Owner: randomElement(owners),
AppraisedValue: uint64(randomInt(1000) + 1), AppraisedValue: uint64(rand.N(1000) + 1),
} }, nil
} }
// Pick a random element from an array. // Pick a random element from an array.
func randomElement(values []string) string { func randomElement(values []string) string {
result := values[randomInt(len(values))] result := values[rand.N(len(values))]
return result return result
} }
// Generate a random integer in the range 0 to max - 1.
func randomInt(max int) int {
result, err := rand.Int(rand.Reader, big.NewInt(int64(max)))
if err != nil {
panic(err)
}
return int(result.Int64())
}
// Pick a random element from an array, excluding the current value. // Pick a random element from an array, excluding the current value.
func differentElement(values []string, currentValue string) string { func differentElement(values []string, currentValue string) string {
candidateValues := []string{} var candidateValues []string
for _, v := range values { for _, v := range values {
if v != currentValue { if v != currentValue {
candidateValues = append(candidateValues, v) candidateValues = append(candidateValues, v)