From 06c7445e9109e1ed1dbdbad248ae2f859d513779 Mon Sep 17 00:00:00 2001 From: Stanislav Jakuschevskij Date: Sat, 4 Jan 2025 13:26:25 +0100 Subject: [PATCH] Replace panic with error handling Starting from the processor.Block.Process all methods now return errors if something goes wrong with unpacking of the blocks and reading the transactions. In each function where the error is being propagated back to client it is wrapped in a message with the function name. This makes it easier to track down the error and see the propagation chain. Finally the error is logged to the terminal and the go routine shuts down gracefully. The graceful shutdown executes all deferred functions which close the context, the checkpointer and the gateway. Before panics were used everywhere which was an issue because the unpacking of the blocks happened in a go routine. When a panic happens in a go routine only the deferred functions of the go routine are called but not those of the client which lead to unexpected behavior. The transact function is also executed in a go routine therefore the same typo of error handling was implemented there. Signed-off-by: Stanislav Jakuschevskij --- .../application-go/contract/contract.go | 23 +++-- off_chain_data/application-go/getAllAssets.go | 5 +- off_chain_data/application-go/listen.go | 4 +- off_chain_data/application-go/parser/block.go | 86 ++++++++++------ .../application-go/parser/block_test.go | 16 ++- .../parser/endorserTransaction.go | 74 +++++++++----- .../parser/namespaceReadWriteSet.go | 9 +- .../application-go/parser/payload.go | 44 ++++++--- .../application-go/parser/transaction.go | 22 ++++- .../application-go/processor/block.go | 99 ++++++++++++------- .../application-go/processor/transaction.go | 42 ++++++-- .../processor/txIdNotFoundError.go | 32 ++++++ .../application-go/store/flatFille.go | 15 +-- off_chain_data/application-go/transact.go | 31 ++++-- off_chain_data/application-go/utils/utils.go | 28 ++++-- .../application-go/utils/utils_test.go | 64 +++++++++--- 16 files changed, 422 insertions(+), 172 deletions(-) create mode 100644 off_chain_data/application-go/processor/txIdNotFoundError.go diff --git a/off_chain_data/application-go/contract/contract.go b/off_chain_data/application-go/contract/contract.go index e672620d..59e27135 100644 --- a/off_chain_data/application-go/contract/contract.go +++ b/off_chain_data/application-go/contract/contract.go @@ -6,6 +6,7 @@ package contract import ( + "fmt" "strconv" "github.com/hyperledger/fabric-gateway/pkg/client" @@ -19,7 +20,7 @@ func NewAssetTransferBasic(contract *client.Contract) *AssetTransferBasic { return &AssetTransferBasic{contract} } -func (atb *AssetTransferBasic) CreateAsset(anAsset Asset) { +func (atb *AssetTransferBasic) CreateAsset(anAsset Asset) error { if _, err := atb.contract.Submit( "CreateAsset", client.WithArguments( @@ -29,11 +30,12 @@ func (atb *AssetTransferBasic) CreateAsset(anAsset Asset) { anAsset.Owner, strconv.FormatUint(anAsset.AppraisedValue, 10), )); err != nil { - panic(err) + return fmt.Errorf("in CreateAsset: %w", err) } + return nil } -func (atb *AssetTransferBasic) TransferAsset(id, newOwner string) string { +func (atb *AssetTransferBasic) TransferAsset(id, newOwner string) (string, error) { result, err := atb.contract.Submit( "TransferAsset", client.WithArguments( @@ -42,27 +44,28 @@ func (atb *AssetTransferBasic) TransferAsset(id, newOwner string) string { ), ) if err != nil { - panic(err) + return "", fmt.Errorf("in TransferAsset: %w", err) } - return string(result) + return string(result), nil } -func (atb *AssetTransferBasic) DeleteAsset(id string) { +func (atb *AssetTransferBasic) DeleteAsset(id string) error { if _, err := atb.contract.Submit( "DeleteAsset", client.WithArguments( id, ), ); err != nil { - panic(err) + return fmt.Errorf("in DeleteAsset: %w", err) } + return nil } -func (atb *AssetTransferBasic) GetAllAssets() []byte { +func (atb *AssetTransferBasic) GetAllAssets() ([]byte, error) { result, err := atb.contract.Evaluate("GetAllAssets") if err != nil { - panic(err) + return []byte{}, fmt.Errorf("in GetAllAssets: %w", err) } - return result + return result, nil } diff --git a/off_chain_data/application-go/getAllAssets.go b/off_chain_data/application-go/getAllAssets.go index 558b19ed..c9cca4da 100644 --- a/off_chain_data/application-go/getAllAssets.go +++ b/off_chain_data/application-go/getAllAssets.go @@ -26,7 +26,10 @@ func getAllAssets(clientConnection *grpc.ClientConn) { contract := gateway.GetNetwork(channelName).GetContract(chaincodeName) smartContract := atb.NewAssetTransferBasic(contract) - assets := smartContract.GetAllAssets() + assets, err := smartContract.GetAllAssets() + if err != nil { + panic((err)) + } fmt.Println(formatJSON(assets)) } diff --git a/off_chain_data/application-go/listen.go b/off_chain_data/application-go/listen.go index d2b99ded..af311626 100644 --- a/off_chain_data/application-go/listen.go +++ b/off_chain_data/application-go/listen.go @@ -80,8 +80,8 @@ func listen(clientConnection *grpc.ClientConn) { channelName, ) - if err := blockProcessor.Process(); err == store.ErrExpected { - fmt.Println(err) + if err := blockProcessor.Process(); err != nil { + fmt.Println("\033[31m[ERROR]\033[0m", err) return } } diff --git a/off_chain_data/application-go/parser/block.go b/off_chain_data/application-go/parser/block.go index 8a1c0f19..ec105eea 100644 --- a/off_chain_data/application-go/parser/block.go +++ b/off_chain_data/application-go/parser/block.go @@ -14,74 +14,102 @@ type Block struct { } func ParseBlock(block *common.Block) *Block { - return &Block{block, nil} + return &Block{block, []*Transaction{}} } -func (b *Block) Number() uint64 { - header := utils.AssertDefined(b.block.GetHeader(), "missing block header") - return header.GetNumber() +func (b *Block) Number() (uint64, error) { + header, err := utils.AssertDefined(b.block.GetHeader(), "missing block header") + if err != nil { + return 0, fmt.Errorf("in Number: %w", err) + } + return header.GetNumber(), nil } -func (b *Block) Transactions() []*Transaction { - return utils.Cache(func() []*Transaction { - envelopes := b.unmarshalEnvelopesFromBlockData() +func (b *Block) Transactions() ([]*Transaction, error) { + return utils.Cache(func() ([]*Transaction, error) { + funcName := "Transactions" + envelopes, err := b.unmarshalEnvelopesFromBlockData() + if err != nil { + return nil, fmt.Errorf("in %s: %w", funcName, err) + } - commonPayloads := b.unmarshalPayloadsFrom(envelopes) + commonPayloads, err := b.unmarshalPayloadsFrom(envelopes) + if err != nil { + return nil, fmt.Errorf("in %s: %w", funcName, err) + } - payloads := b.parse(commonPayloads) + payloads, err := b.parse(commonPayloads) + if err != nil { + return nil, fmt.Errorf("in %s: %w", funcName, err) + } - return b.createTransactionsFrom(payloads) + return b.createTransactionsFrom(payloads), nil })() } -func (b *Block) unmarshalEnvelopesFromBlockData() []*common.Envelope { +func (b *Block) unmarshalEnvelopesFromBlockData() ([]*common.Envelope, error) { result := []*common.Envelope{} for _, blockData := range b.block.GetData().GetData() { envelope := &common.Envelope{} if err := proto.Unmarshal(blockData, envelope); err != nil { - panic(err) + return nil, fmt.Errorf("in unmarshalEnvelopesFromBlockData: %w", err) } result = append(result, envelope) } - return result + return result, nil } -func (*Block) unmarshalPayloadsFrom(envelopes []*common.Envelope) []*common.Payload { +func (*Block) unmarshalPayloadsFrom(envelopes []*common.Envelope) ([]*common.Payload, error) { result := []*common.Payload{} for _, envelope := range envelopes { commonPayload := &common.Payload{} if err := proto.Unmarshal(envelope.GetPayload(), commonPayload); err != nil { - panic(err) + return nil, fmt.Errorf("in unmarshalPayloadsFrom: %w", err) } result = append(result, commonPayload) } - return result + return result, nil } -func (b *Block) parse(commonPayloads []*common.Payload) []*payload { - validationCodes := b.extractTransactionValidationCodes() +func (b *Block) parse(commonPayloads []*common.Payload) ([]*payload, error) { + funcName := "parse" + + validationCodes, err := b.extractTransactionValidationCodes() + if err != nil { + return nil, fmt.Errorf("in %s: %w", funcName, err) + } + result := []*payload{} for i, commonPayload := range commonPayloads { - payload := parsePayload( - commonPayload, - int32(utils.AssertDefined( - validationCodes[i], - fmt.Sprint("missing validation code index", i), - ), - ), + statusCode, err := utils.AssertDefined( + validationCodes[i], + fmt.Sprint("missing validation code index", i), ) - if payload.isEndorserTransaction() { + if err != nil { + return nil, fmt.Errorf("in %s: %w", funcName, err) + } + + payload := parsePayload(commonPayload, int32(statusCode)) + is, err := payload.isEndorserTransaction() + if err != nil { + return nil, fmt.Errorf("in %s: %w", funcName, err) + } + if is { result = append(result, payload) } } - return result + + return result, nil } -func (b *Block) extractTransactionValidationCodes() []byte { - metadata := utils.AssertDefined( +func (b *Block) extractTransactionValidationCodes() ([]byte, error) { + metadata, err := utils.AssertDefined( b.block.GetMetadata(), "missing block metadata", ) + if err != nil { + return nil, fmt.Errorf("in extractTransactionValidationCodes: %w", err) + } return utils.AssertDefined( metadata.GetMetadata()[common.BlockMetadataIndex_TRANSACTIONS_FILTER], diff --git a/off_chain_data/application-go/parser/block_test.go b/off_chain_data/application-go/parser/block_test.go index 838af348..b8546adf 100644 --- a/off_chain_data/application-go/parser/block_test.go +++ b/off_chain_data/application-go/parser/block_test.go @@ -35,12 +35,17 @@ func Test_GetReadWriteSetsFromEndorserTransaction(t *testing.T) { } parsedEndorserTransaction := parseEndorserTransaction(transaction) - if len(parsedEndorserTransaction.readWriteSets()) != 1 { - t.Fatal("expected 1 ReadWriteSet, got", len(parsedEndorserTransaction.readWriteSets())) + readWriteSets, err := parsedEndorserTransaction.readWriteSets() + if err != nil { + t.Fatal("unexpected error:", err) + } + + if len(readWriteSets) != 1 { + t.Fatal("expected 1 ReadWriteSet, got", len(readWriteSets)) } assertReadWriteSet( - parsedEndorserTransaction.readWriteSets()[0].namespaceReadWriteSets()[0], + readWriteSets[0].namespaceReadWriteSets()[0], expectedNamespace, expectedAsset, t, @@ -57,7 +62,10 @@ func assertReadWriteSet( t.Errorf("expected namespace %s, got %s", expectedNamespace, parsedNsRwSet.Namespace()) } - actualKVRWSet := parsedNsRwSet.ReadWriteSet() + actualKVRWSet, err := parsedNsRwSet.ReadWriteSet() + if err != nil { + t.Fatal("unexpected error:", err) + } if len(actualKVRWSet.Writes) != 1 { t.Fatal("expected 1 write, got", len(actualKVRWSet.Writes)) } diff --git a/off_chain_data/application-go/parser/endorserTransaction.go b/off_chain_data/application-go/parser/endorserTransaction.go index d3653979..161775d6 100644 --- a/off_chain_data/application-go/parser/endorserTransaction.go +++ b/off_chain_data/application-go/parser/endorserTransaction.go @@ -1,6 +1,7 @@ package parser import ( + "fmt" "offChainData/utils" "github.com/hyperledger/fabric-protos-go-apiv2/ledger/rwset" @@ -16,83 +17,104 @@ func parseEndorserTransaction(transaction *peer.Transaction) *endorserTransactio return &endorserTransaction{transaction} } -func (p *endorserTransaction) readWriteSets() []*readWriteSet { - return utils.Cache(func() []*readWriteSet { - chaincodeActionPayloads := p.unmarshalChaincodeActionPayloads() +func (p *endorserTransaction) readWriteSets() ([]*readWriteSet, error) { + return utils.Cache(func() ([]*readWriteSet, error) { + funcName := "readWriteSets" + chaincodeActionPayloads, err := p.unmarshalChaincodeActionPayloads() + if err != nil { + return nil, fmt.Errorf("in %s: %w", funcName, err) + } - chaincodeEndorsedActions := p.extractChaincodeEndorsedActionsFrom(chaincodeActionPayloads) + chaincodeEndorsedActions, err := p.extractChaincodeEndorsedActionsFrom(chaincodeActionPayloads) + if err != nil { + return nil, fmt.Errorf("in %s: %w", funcName, err) + } - proposalResponsePayloads := p.unmarshalProposalResponsePayloadsFrom(chaincodeEndorsedActions) + proposalResponsePayloads, err := p.unmarshalProposalResponsePayloadsFrom(chaincodeEndorsedActions) + if err != nil { + return nil, fmt.Errorf("in %s: %w", funcName, err) + } - chaincodeActions := p.unmarshalChaincodeActionsFrom(proposalResponsePayloads) + chaincodeActions, err := p.unmarshalChaincodeActionsFrom(proposalResponsePayloads) + if err != nil { + return nil, fmt.Errorf("in %s: %w", funcName, err) + } - txReadWriteSets := p.unmarshalTxReadWriteSetsFrom(chaincodeActions) + txReadWriteSets, err := p.unmarshalTxReadWriteSetsFrom(chaincodeActions) + if err != nil { + return nil, fmt.Errorf("in %s: %w", funcName, err) + } - return p.parseReadWriteSets(txReadWriteSets) + return p.parseReadWriteSets(txReadWriteSets), nil })() } -func (p *endorserTransaction) unmarshalChaincodeActionPayloads() []*peer.ChaincodeActionPayload { +func (p *endorserTransaction) unmarshalChaincodeActionPayloads() ([]*peer.ChaincodeActionPayload, error) { result := []*peer.ChaincodeActionPayload{} for _, transactionAction := range p.transaction.GetActions() { chaincodeActionPayload := &peer.ChaincodeActionPayload{} if err := proto.Unmarshal(transactionAction.GetPayload(), chaincodeActionPayload); err != nil { - panic(err) + return nil, fmt.Errorf("in unmarshalChaincodeActionPayloads: %w", err) } result = append(result, chaincodeActionPayload) } - return result + return result, nil } -func (*endorserTransaction) extractChaincodeEndorsedActionsFrom(chaincodeActionPayloads []*peer.ChaincodeActionPayload) []*peer.ChaincodeEndorsedAction { +func (*endorserTransaction) extractChaincodeEndorsedActionsFrom(chaincodeActionPayloads []*peer.ChaincodeActionPayload) ([]*peer.ChaincodeEndorsedAction, error) { result := []*peer.ChaincodeEndorsedAction{} for _, payload := range chaincodeActionPayloads { + chaincodeEndorsedAction, err := utils.AssertDefined( + payload.GetAction(), + "missing chaincode endorsed action", + ) + if err != nil { + return nil, fmt.Errorf("in extractChaincodeEndorsedActionsFrom: %w", err) + } + result = append( result, - utils.AssertDefined( - payload.GetAction(), - "missing chaincode endorsed action", - ), + chaincodeEndorsedAction, ) } - return result + return result, nil } -func (*endorserTransaction) unmarshalProposalResponsePayloadsFrom(chaincodeEndorsedActions []*peer.ChaincodeEndorsedAction) []*peer.ProposalResponsePayload { +func (*endorserTransaction) unmarshalProposalResponsePayloadsFrom(chaincodeEndorsedActions []*peer.ChaincodeEndorsedAction) ([]*peer.ProposalResponsePayload, error) { result := []*peer.ProposalResponsePayload{} for _, endorsedAction := range chaincodeEndorsedActions { proposalResponsePayload := &peer.ProposalResponsePayload{} if err := proto.Unmarshal(endorsedAction.GetProposalResponsePayload(), proposalResponsePayload); err != nil { - panic(err) + return nil, fmt.Errorf("in unmarshalProposalResponsePayloadsFrom: %w", err) } result = append(result, proposalResponsePayload) } - return result + return result, nil } -func (*endorserTransaction) unmarshalChaincodeActionsFrom(proposalResponsePayloads []*peer.ProposalResponsePayload) []*peer.ChaincodeAction { +func (*endorserTransaction) unmarshalChaincodeActionsFrom(proposalResponsePayloads []*peer.ProposalResponsePayload) ([]*peer.ChaincodeAction, error) { result := []*peer.ChaincodeAction{} for _, proposalResponsePayload := range proposalResponsePayloads { chaincodeAction := &peer.ChaincodeAction{} if err := proto.Unmarshal(proposalResponsePayload.GetExtension(), chaincodeAction); err != nil { - panic(err) + return nil, fmt.Errorf("in unmarshalChaincodeActionsFrom: %w", err) } result = append(result, chaincodeAction) } - return result + return result, nil } -func (*endorserTransaction) unmarshalTxReadWriteSetsFrom(chaincodeActions []*peer.ChaincodeAction) []*rwset.TxReadWriteSet { +func (*endorserTransaction) unmarshalTxReadWriteSetsFrom(chaincodeActions []*peer.ChaincodeAction) ([]*rwset.TxReadWriteSet, error) { result := []*rwset.TxReadWriteSet{} for _, chaincodeAction := range chaincodeActions { txReadWriteSet := &rwset.TxReadWriteSet{} if err := proto.Unmarshal(chaincodeAction.GetResults(), txReadWriteSet); err != nil { - continue + return nil, fmt.Errorf("in unmarshalTxReadWriteSetsFrom: %w", err) } result = append(result, txReadWriteSet) } - return result + return result, nil } func (*endorserTransaction) parseReadWriteSets(txReadWriteSets []*rwset.TxReadWriteSet) []*readWriteSet { diff --git a/off_chain_data/application-go/parser/namespaceReadWriteSet.go b/off_chain_data/application-go/parser/namespaceReadWriteSet.go index 341571ec..4fdd8a91 100644 --- a/off_chain_data/application-go/parser/namespaceReadWriteSet.go +++ b/off_chain_data/application-go/parser/namespaceReadWriteSet.go @@ -1,6 +1,7 @@ package parser import ( + "fmt" "offChainData/utils" "github.com/hyperledger/fabric-protos-go-apiv2/ledger/rwset" @@ -20,13 +21,13 @@ func (p *NamespaceReadWriteSet) Namespace() string { return p.nsReadWriteSet.GetNamespace() } -func (p *NamespaceReadWriteSet) ReadWriteSet() *kvrwset.KVRWSet { - return utils.Cache(func() *kvrwset.KVRWSet { +func (p *NamespaceReadWriteSet) ReadWriteSet() (*kvrwset.KVRWSet, error) { + return utils.Cache(func() (*kvrwset.KVRWSet, error) { result := kvrwset.KVRWSet{} if err := proto.Unmarshal(p.nsReadWriteSet.GetRwset(), &result); err != nil { - panic(err) + return nil, fmt.Errorf("in ReadWriteSet: %w", err) } - return &result + return &result, nil })() } diff --git a/off_chain_data/application-go/parser/payload.go b/off_chain_data/application-go/parser/payload.go index b61454a8..f8ba3105 100644 --- a/off_chain_data/application-go/parser/payload.go +++ b/off_chain_data/application-go/parser/payload.go @@ -18,34 +18,54 @@ func parsePayload(commonPayload *common.Payload, statusCode int32) *payload { return &payload{commonPayload, statusCode} } -func (p *payload) channelHeader() *common.ChannelHeader { - return utils.Cache(func() *common.ChannelHeader { - header := utils.AssertDefined(p.commonPayload.GetHeader(), "missing payload header") +func (p *payload) channelHeader() (*common.ChannelHeader, error) { + return utils.Cache(func() (*common.ChannelHeader, error) { + funcName := "channelHeader" + + header, err := utils.AssertDefined(p.commonPayload.GetHeader(), "missing payload header") + if err != nil { + return nil, fmt.Errorf("in %s: %w", funcName, err) + } result := &common.ChannelHeader{} if err := proto.Unmarshal(header.GetChannelHeader(), result); err != nil { - panic(err) + return nil, fmt.Errorf("in %s: %w", funcName, err) } - return result + return result, nil })() } -func (p *payload) endorserTransaction() *endorserTransaction { - if !p.isEndorserTransaction() { - panic(fmt.Errorf("unexpected payload type: %d", p.channelHeader().GetType())) +func (p *payload) endorserTransaction() (*endorserTransaction, error) { + funcName := "endorserTransaction" + + is, err := p.isEndorserTransaction() + if err != nil { + return nil, fmt.Errorf("in %s: %w", funcName, err) + } + if !is { + channelHeader, err := p.channelHeader() + if err != nil { + return nil, fmt.Errorf("in %s: %w", funcName, err) + } + return nil, fmt.Errorf("unexpected payload type: %d", channelHeader.GetType()) } result := &peer.Transaction{} if err := proto.Unmarshal(p.commonPayload.GetData(), result); err != nil { - panic(err) + return nil, fmt.Errorf("in %s: %w", funcName, err) } - return parseEndorserTransaction(result) + return parseEndorserTransaction(result), nil } -func (p *payload) isEndorserTransaction() bool { - return p.channelHeader().GetType() == int32(common.HeaderType_ENDORSER_TRANSACTION) +func (p *payload) isEndorserTransaction() (bool, error) { + channelHeader, err := p.channelHeader() + if err != nil { + return false, fmt.Errorf("in isEndorserTransaction: %w", err) + } + + return channelHeader.GetType() == int32(common.HeaderType_ENDORSER_TRANSACTION), nil } func (p *payload) isValid() bool { diff --git a/off_chain_data/application-go/parser/transaction.go b/off_chain_data/application-go/parser/transaction.go index d7c618d9..7bead914 100644 --- a/off_chain_data/application-go/parser/transaction.go +++ b/off_chain_data/application-go/parser/transaction.go @@ -1,6 +1,8 @@ package parser import ( + "fmt" + "github.com/hyperledger/fabric-protos-go-apiv2/common" ) @@ -12,17 +14,29 @@ func newTransaction(payload *payload) *Transaction { return &Transaction{payload} } -func (t *Transaction) ChannelHeader() *common.ChannelHeader { +func (t *Transaction) ChannelHeader() (*common.ChannelHeader, error) { return t.payload.channelHeader() } -func (t *Transaction) NamespaceReadWriteSets() []*NamespaceReadWriteSet { +func (t *Transaction) NamespaceReadWriteSets() ([]*NamespaceReadWriteSet, error) { + funcName := "NamespaceReadWriteSets" + + endorserTransaction, err := t.payload.endorserTransaction() + if err != nil { + return nil, fmt.Errorf("in %s: %w", funcName, err) + } + + txReadWriteSets, err := endorserTransaction.readWriteSets() + if err != nil { + return nil, fmt.Errorf("in %s: %w", funcName, err) + } + result := []*NamespaceReadWriteSet{} - for _, readWriteSet := range t.payload.endorserTransaction().readWriteSets() { + for _, readWriteSet := range txReadWriteSets { result = append(result, readWriteSet.namespaceReadWriteSets()...) } - return result + return result, nil } func (t *Transaction) IsValid() bool { diff --git a/off_chain_data/application-go/processor/block.go b/off_chain_data/application-go/processor/block.go index fcf44c9f..530bf23f 100644 --- a/off_chain_data/application-go/processor/block.go +++ b/off_chain_data/application-go/processor/block.go @@ -30,11 +30,21 @@ func NewBlock( } func (b *block) Process() error { - blockNumber := b.parsedBlock.Number() + funcName := "Process" + + blockNumber, err := b.parsedBlock.Number() + if err != nil { + return fmt.Errorf("in %s: %w", funcName, err) + } fmt.Println("\nReceived block", blockNumber) - for _, validTransaction := range b.validTransactions() { + validTransactions, err := b.validTransactions() + if err != nil { + return fmt.Errorf("in %s: %w", funcName, err) + } + + for _, validTransaction := range validTransactions { aTransaction := transaction{ blockNumber, validTransaction, @@ -43,46 +53,74 @@ func (b *block) Process() error { b.channelName, } if err := aTransaction.process(); err != nil { - return err + return fmt.Errorf("in %s: %w", funcName, err) } - transactionId := validTransaction.ChannelHeader().GetTxId() + channelHeader, err := validTransaction.ChannelHeader() + if err != nil { + return fmt.Errorf("in %s: %w", funcName, err) + } + transactionId := channelHeader.GetTxId() b.checkpointer.CheckpointTransaction(blockNumber, transactionId) } - b.checkpointer.CheckpointBlock(b.parsedBlock.Number()) + b.checkpointer.CheckpointBlock(blockNumber) return nil } -func (b *block) validTransactions() []*parser.Transaction { +func (b *block) validTransactions() ([]*parser.Transaction, error) { result := []*parser.Transaction{} - for _, transaction := range b.getNewTransactions() { + newTransactions, err := b.getNewTransactions() + if err != nil { + return nil, fmt.Errorf("in validTransactions: %w", err) + } + + for _, transaction := range newTransactions { if transaction.IsValid() { result = append(result, transaction) } } - return result + return result, nil } -func (b *block) getNewTransactions() []*parser.Transaction { - transactions := b.parsedBlock.Transactions() +func (b *block) getNewTransactions() ([]*parser.Transaction, error) { + funcName := "getNewTransactions" + + transactions, err := b.parsedBlock.Transactions() + if err != nil { + return nil, fmt.Errorf("in %s: %w", funcName, err) + } lastTransactionId := b.checkpointer.TransactionID() if lastTransactionId == "" { // No previously processed transactions within this block so all are new - return transactions + return transactions, nil } // Ignore transactions up to the last processed transaction ID - lastProcessedIndex := b.findLastProcessedIndex() - return transactions[lastProcessedIndex+1:] + lastProcessedIndex, err := b.findLastProcessedIndex() + if err != nil { + return nil, fmt.Errorf("in %s: %w", funcName, err) + } + return transactions[lastProcessedIndex+1:], nil } -func (b *block) findLastProcessedIndex() int { +func (b *block) findLastProcessedIndex() (int, error) { + funcName := "findLastProcessedIndex" + + transactions, err := b.parsedBlock.Transactions() + if err != nil { + return 0, fmt.Errorf("in %s: %w", funcName, err) + } + blockTransactionIds := []string{} - for _, transaction := range b.parsedBlock.Transactions() { - blockTransactionIds = append(blockTransactionIds, transaction.ChannelHeader().GetTxId()) + for _, transaction := range transactions { + channelHeader, err := transaction.ChannelHeader() + if err != nil { + return 0, fmt.Errorf("in %s: %w", funcName, err) + } + blockTransactionIds = append(blockTransactionIds, channelHeader.GetTxId()) } lastTransactionId := b.checkpointer.TransactionID() @@ -92,27 +130,18 @@ func (b *block) findLastProcessedIndex() int { lastProcessedIndex = index } } + if lastProcessedIndex < 0 { - panic( - fmt.Errorf( - "checkpoint transaction ID %s not found in block %d containing transactions: %s", - lastTransactionId, - b.parsedBlock.Number(), - b.joinByComma(blockTransactionIds), - ), + blockNumber, err := b.parsedBlock.Number() + if err != nil { + return 0, fmt.Errorf("in %s: %w", funcName, err) + } + return lastProcessedIndex, newTxIdNotFoundError( + lastTransactionId, + blockNumber, + blockTransactionIds, ) } - return lastProcessedIndex -} -func (b *block) joinByComma(list []string) string { - result := "" - for index, item := range list { - if len(list)-1 == index { - result += item - } else { - result += item + ", " - } - } - return result + return lastProcessedIndex, nil } diff --git a/off_chain_data/application-go/processor/transaction.go b/off_chain_data/application-go/processor/transaction.go index 5d705279..ec9c9608 100644 --- a/off_chain_data/application-go/processor/transaction.go +++ b/off_chain_data/application-go/processor/transaction.go @@ -15,9 +15,19 @@ type transaction struct { } func (t *transaction) process() error { - transactionId := t.transaction.ChannelHeader().GetTxId() + funcName := "process" + + channelHeader, err := t.transaction.ChannelHeader() + if err != nil { + return fmt.Errorf("in %s: %w", funcName, err) + } + transactionId := channelHeader.GetTxId() + + writes, err := t.writes() + if err != nil { + return fmt.Errorf("in %s: %w", funcName, err) + } - writes := t.writes() if len(writes) == 0 { fmt.Println("Skipping read-only or system transaction", transactionId) return nil @@ -30,18 +40,29 @@ func (t *transaction) process() error { TransactionId: transactionId, Writes: writes, }); err != nil { - return err + return fmt.Errorf("in %s: %w", funcName, err) } + return nil } -func (t *transaction) writes() []store.Write { +func (t *transaction) writes() ([]store.Write, error) { + funcName := "writes" // TODO this entire code should live in the parser and just return the kvWrite which // we then map to store.Write and return - t.channelName = t.transaction.ChannelHeader().GetChannelId() + channelHeader, err := t.transaction.ChannelHeader() + if err != nil { + return nil, fmt.Errorf("in %s: %w", funcName, err) + } + t.channelName = channelHeader.GetChannelId() + + nsReadWriteSets, err := t.transaction.NamespaceReadWriteSets() + if err != nil { + return nil, fmt.Errorf("in %s: %w", funcName, err) + } nonSystemCCReadWriteSets := []*parser.NamespaceReadWriteSet{} - for _, nsReadWriteSet := range t.transaction.NamespaceReadWriteSets() { + for _, nsReadWriteSet := range nsReadWriteSets { if !t.isSystemChaincode(nsReadWriteSet.Namespace()) { nonSystemCCReadWriteSets = append(nonSystemCCReadWriteSets, nsReadWriteSet) } @@ -51,7 +72,12 @@ func (t *transaction) writes() []store.Write { for _, readWriteSet := range nonSystemCCReadWriteSets { namespace := readWriteSet.Namespace() - for _, kvWrite := range readWriteSet.ReadWriteSet().GetWrites() { + kvReadWriteSet, err := readWriteSet.ReadWriteSet() + if err != nil { + return nil, fmt.Errorf("in %s: %w", funcName, err) + } + + for _, kvWrite := range kvReadWriteSet.GetWrites() { writes = append(writes, store.Write{ ChannelName: t.channelName, Namespace: namespace, @@ -62,7 +88,7 @@ func (t *transaction) writes() []store.Write { } } - return writes + return writes, nil } func (t *transaction) isSystemChaincode(chaincodeName string) bool { diff --git a/off_chain_data/application-go/processor/txIdNotFoundError.go b/off_chain_data/application-go/processor/txIdNotFoundError.go new file mode 100644 index 00000000..7a8f230a --- /dev/null +++ b/off_chain_data/application-go/processor/txIdNotFoundError.go @@ -0,0 +1,32 @@ +package processor + +import "fmt" + +type txIdNotFoundError struct { + txId string + blockNumber uint64 + blockTxIds []string +} + +func newTxIdNotFoundError(txId string, blockNumber uint64, blockTxIds []string) *txIdNotFoundError { + return &txIdNotFoundError{ + txId, blockNumber, blockTxIds, + } +} + +func (t *txIdNotFoundError) Error() string { + format := "checkpoint transaction ID %s not found in block %d containing transactions: %s" + return fmt.Sprintf(format, t.txId, t.blockNumber, t.blockTxIdsJoinedByComma()) +} + +func (t *txIdNotFoundError) blockTxIdsJoinedByComma() string { + result := "" + for index, item := range t.blockTxIds { + if len(t.blockTxIds)-1 == index { + result += item + } else { + result += item + ", " + } + } + return result +} diff --git a/off_chain_data/application-go/store/flatFille.go b/off_chain_data/application-go/store/flatFille.go index 432099d3..81794b6c 100644 --- a/off_chain_data/application-go/store/flatFille.go +++ b/off_chain_data/application-go/store/flatFille.go @@ -14,20 +14,21 @@ import ( var storeFile = utils.EnvOrDefault("STORE_FILE", "store.log") var SimulatedFailureCount = getSimulatedFailureCount() var transactionCount uint = 0 // Used only to simulate failures -var ErrExpected = errors.New("[expected error]: simulated write failure") // Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance. // This implementation just writes to a file. func ApplyWritesToOffChainStore(data LedgerUpdate) error { + funcName := "ApplyWritesToOffChainStore" + if err := simulateFailureIfRequired(); err != nil { - return err + return fmt.Errorf("in %s: %w", funcName, err) } writes := []string{} for _, write := range data.Writes { marshaled, err := json.Marshal(write) if err != nil { - panic(err) + return fmt.Errorf("in %s: %w", funcName, err) } writes = append(writes, string(marshaled)) @@ -35,16 +36,16 @@ func ApplyWritesToOffChainStore(data LedgerUpdate) error { f, err := os.OpenFile(storeFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { - panic(err) + return fmt.Errorf("in %s: %w", funcName, err) } if _, err := f.Write([]byte(strings.Join(writes, "\n") + "\n")); err != nil { f.Close() - panic(err) + return fmt.Errorf("in %s: %w", funcName, err) } if err := f.Close(); err != nil { - panic(err) + return fmt.Errorf("in %s: %w", funcName, err) } return nil @@ -53,7 +54,7 @@ func ApplyWritesToOffChainStore(data LedgerUpdate) error { func simulateFailureIfRequired() error { if SimulatedFailureCount > 0 && transactionCount >= SimulatedFailureCount { transactionCount = 0 - return ErrExpected + return errors.New("expected error: simulated write failure") } transactionCount += 1 diff --git a/off_chain_data/application-go/transact.go b/off_chain_data/application-go/transact.go index d6ba356e..532aa098 100644 --- a/off_chain_data/application-go/transact.go +++ b/off_chain_data/application-go/transact.go @@ -17,7 +17,10 @@ func transact(clientConnection *grpc.ClientConn) { if err != nil { panic((err)) } - defer gateway.Close() + defer func() { + gateway.Close() + fmt.Println("Gateway closed.") + }() contract := gateway.GetNetwork(channelName).GetContract(chaincodeName) @@ -42,29 +45,45 @@ func (t *transactApp) run() { wg.Add(1) go func() { defer wg.Done() - t.transact() + + if err := t.transact(); err != nil { + fmt.Println("\033[31m[ERROR]\033[0m", err) + return + } }() } wg.Wait() } -func (t *transactApp) transact() { +func (t *transactApp) transact() error { + funcName := "transact" + anAsset := atb.NewAsset() - t.smartContract.CreateAsset(anAsset) + err := t.smartContract.CreateAsset(anAsset) + if err != nil { + return fmt.Errorf("in %s: %w", funcName, err) + } fmt.Println("Created asset", anAsset.ID) // Transfer randomly 1 in 2 assets to a new owner. if utils.RandomInt(2) == 0 { newOwner := utils.DifferentElement(atb.Owners, anAsset.Owner) - oldOwner := t.smartContract.TransferAsset(anAsset.ID, newOwner) + oldOwner, err := t.smartContract.TransferAsset(anAsset.ID, newOwner) + if err != nil { + return fmt.Errorf("in %s: %w", funcName, err) + } fmt.Printf("Transferred asset %s from %s to %s\n", anAsset.ID, oldOwner, newOwner) } // Delete randomly 1 in 4 created assets. if utils.RandomInt(4) == 0 { - t.smartContract.DeleteAsset(anAsset.ID) + err := t.smartContract.DeleteAsset(anAsset.ID) + if err != nil { + return fmt.Errorf("in %s: %w", funcName, err) + } fmt.Println("Deleted asset", anAsset.ID) } + return nil } diff --git a/off_chain_data/application-go/utils/utils.go b/off_chain_data/application-go/utils/utils.go index 2700c860..c3714931 100644 --- a/off_chain_data/application-go/utils/utils.go +++ b/off_chain_data/application-go/utils/utils.go @@ -3,6 +3,7 @@ package utils import ( "crypto/rand" "errors" + "fmt" "math/big" "os" ) @@ -35,25 +36,32 @@ func DifferentElement(values []string, currentValue string) string { } // Return the value if it is defined; otherwise panics with an error message. -func AssertDefined[T any](value T, message string) T { +func AssertDefined[T any](value T, message string) (T, error) { if any(value) == any(nil) { - panic(errors.New(message)) + var zeroValue T + return zeroValue, errors.New(message) } - return value + return value, nil } // Wrap a function call with a cache. On first call the wrapped function is invoked to // obtain a result. Subsequent calls return the cached result. -func Cache[T any](f func() T) func() T { - value := any(nil) +func Cache[T any](f func() (T, error)) func() (T, error) { + var value T + var err error + var cached bool - return func() T { - if value == nil { - value = f() + return func() (T, error) { + if !cached { + value, err = f() + if err != nil { + var zeroValue T + return zeroValue, fmt.Errorf("in Cache: %w", err) + } + cached = true } - - return value.(T) + return value, nil } } diff --git a/off_chain_data/application-go/utils/utils_test.go b/off_chain_data/application-go/utils/utils_test.go index 2f8d4b38..d806c6e4 100644 --- a/off_chain_data/application-go/utils/utils_test.go +++ b/off_chain_data/application-go/utils/utils_test.go @@ -1,20 +1,27 @@ package utils_test import ( + "errors" "offChainData/utils" "testing" ) func Test_cachePrimitiveFunctionResult(t *testing.T) { counter := 0 - f := func() int { + f := func() (int, error) { counter++ - return 5 + return 5, nil } cachedFunc := utils.Cache(f) - result1 := cachedFunc() - result2 := cachedFunc() + result1, err := cachedFunc() + if err != nil { + t.Fatal("unexpected error:", err) + } + result2, err := cachedFunc() + if err != nil { + t.Fatal("unexpected error:", err) + } if counter != 1 { t.Error("expected counter to be 1, but got", counter) @@ -25,13 +32,36 @@ func Test_cachePrimitiveFunctionResult(t *testing.T) { } } +func Test_whenCachedFunctionsErrors_returnError(t *testing.T) { + errorMsg := "error" + f := func() (int, error) { + return 0, errors.New(errorMsg) + } + + cachedFunc := utils.Cache(f) + _, err := cachedFunc() + if err == nil { + t.Fatal("expected error, but got", err) + } + + if err.Error() != errorMsg { + t.Fatal("expected error message to be 'error', but got", err) + } +} + func Test_cacheWrappedPrimitiveFunctionResult(t *testing.T) { controlValue := 5 - multiplyControlValueBy := func(n int) int { controlValue *= n; return controlValue } + multiplyControlValueBy := func(n int) (int, error) { controlValue *= n; return controlValue, nil } - cachedFunc := utils.Cache(func() int { return multiplyControlValueBy(5) }) - result1 := cachedFunc() - result2 := cachedFunc() + cachedFunc := utils.Cache(func() (int, error) { return multiplyControlValueBy(5) }) + result1, err := cachedFunc() + if err != nil { + t.Fatal("unexpected error:", err) + } + result2, err := cachedFunc() + if err != nil { + t.Fatal("unexpected error:", err) + } if controlValue != 25 { t.Error("expected control value to be 25, but got", controlValue) @@ -48,17 +78,23 @@ func Test_cacheWrappedDataStructureResult(t *testing.T) { } controlStruct := &GreetMe{helloTo: "Hello "} - greet := func(name string) *GreetMe { controlStruct.helloTo += name; return controlStruct } + greet := func(name string) (*GreetMe, error) { controlStruct.helloTo += name; return controlStruct, nil } - cachedFunc := utils.Cache(func() *GreetMe { return greet("John Doe") }) - result1 := cachedFunc().helloTo - result2 := cachedFunc().helloTo + cachedFunc := utils.Cache(func() (*GreetMe, error) { return greet("John Doe") }) + result1, err := cachedFunc() + if err != nil { + t.Fatal("unexpected error:", err) + } + result2, err := cachedFunc() + if err != nil { + t.Fatal("unexpected error:", err) + } if controlStruct.helloTo != "Hello John Doe" { t.Error("expected control value to be 'Hello John Doe', but got", controlStruct) } - if result1 != "Hello John Doe" || result2 != "Hello John Doe" { - t.Fatal("expected cached results to be 'Hello John Doe', but got", result1, result2) + if result1.helloTo != "Hello John Doe" || result2.helloTo != "Hello John Doe" { + t.Fatal("expected cached results to be 'Hello John Doe', but got", result1.helloTo, result2.helloTo) } }