Replace panic with error handling

Starting from the processor.Block.Process all methods now return errors
if something goes wrong with unpacking of the blocks and reading the
transactions. In each function where the error is being propagated back
to client it is wrapped in a message with the function name. This makes
it easier to track down the error and see the propagation chain. Finally
the error is logged to the terminal and the go routine shuts down
gracefully. The graceful shutdown executes all deferred functions which
close the context, the checkpointer and the gateway.

Before panics were used everywhere which was an issue because the
unpacking of the blocks happened in a go routine. When a panic happens
in a go routine only the deferred functions of the go routine are called
but not those of the client which lead to unexpected behavior.

The transact function is also executed in a go routine therefore the
same typo of error handling was implemented there.

Signed-off-by: Stanislav Jakuschevskij <stas@two-giants.com>
This commit is contained in:
Stanislav Jakuschevskij 2025-01-04 13:26:25 +01:00
parent 1364645c6c
commit 06c7445e91
No known key found for this signature in database
GPG key ID: 78195D2E6998E2EB
16 changed files with 422 additions and 172 deletions

View file

@ -6,6 +6,7 @@
package contract package contract
import ( import (
"fmt"
"strconv" "strconv"
"github.com/hyperledger/fabric-gateway/pkg/client" "github.com/hyperledger/fabric-gateway/pkg/client"
@ -19,7 +20,7 @@ func NewAssetTransferBasic(contract *client.Contract) *AssetTransferBasic {
return &AssetTransferBasic{contract} return &AssetTransferBasic{contract}
} }
func (atb *AssetTransferBasic) CreateAsset(anAsset Asset) { func (atb *AssetTransferBasic) CreateAsset(anAsset Asset) error {
if _, err := atb.contract.Submit( if _, err := atb.contract.Submit(
"CreateAsset", "CreateAsset",
client.WithArguments( client.WithArguments(
@ -29,11 +30,12 @@ func (atb *AssetTransferBasic) CreateAsset(anAsset Asset) {
anAsset.Owner, anAsset.Owner,
strconv.FormatUint(anAsset.AppraisedValue, 10), strconv.FormatUint(anAsset.AppraisedValue, 10),
)); err != nil { )); err != nil {
panic(err) return fmt.Errorf("in CreateAsset: %w", err)
} }
return nil
} }
func (atb *AssetTransferBasic) TransferAsset(id, newOwner string) string { func (atb *AssetTransferBasic) TransferAsset(id, newOwner string) (string, error) {
result, err := atb.contract.Submit( result, err := atb.contract.Submit(
"TransferAsset", "TransferAsset",
client.WithArguments( client.WithArguments(
@ -42,27 +44,28 @@ func (atb *AssetTransferBasic) TransferAsset(id, newOwner string) string {
), ),
) )
if err != nil { if err != nil {
panic(err) return "", fmt.Errorf("in TransferAsset: %w", err)
} }
return string(result) return string(result), nil
} }
func (atb *AssetTransferBasic) DeleteAsset(id string) { func (atb *AssetTransferBasic) DeleteAsset(id string) error {
if _, err := atb.contract.Submit( if _, err := atb.contract.Submit(
"DeleteAsset", "DeleteAsset",
client.WithArguments( client.WithArguments(
id, id,
), ),
); err != nil { ); err != nil {
panic(err) return fmt.Errorf("in DeleteAsset: %w", err)
} }
return nil
} }
func (atb *AssetTransferBasic) GetAllAssets() []byte { func (atb *AssetTransferBasic) GetAllAssets() ([]byte, error) {
result, err := atb.contract.Evaluate("GetAllAssets") result, err := atb.contract.Evaluate("GetAllAssets")
if err != nil { if err != nil {
panic(err) return []byte{}, fmt.Errorf("in GetAllAssets: %w", err)
} }
return result return result, nil
} }

View file

@ -26,7 +26,10 @@ func getAllAssets(clientConnection *grpc.ClientConn) {
contract := gateway.GetNetwork(channelName).GetContract(chaincodeName) contract := gateway.GetNetwork(channelName).GetContract(chaincodeName)
smartContract := atb.NewAssetTransferBasic(contract) smartContract := atb.NewAssetTransferBasic(contract)
assets := smartContract.GetAllAssets() assets, err := smartContract.GetAllAssets()
if err != nil {
panic((err))
}
fmt.Println(formatJSON(assets)) fmt.Println(formatJSON(assets))
} }

View file

@ -80,8 +80,8 @@ func listen(clientConnection *grpc.ClientConn) {
channelName, channelName,
) )
if err := blockProcessor.Process(); err == store.ErrExpected { if err := blockProcessor.Process(); err != nil {
fmt.Println(err) fmt.Println("\033[31m[ERROR]\033[0m", err)
return return
} }
} }

View file

@ -14,74 +14,102 @@ type Block struct {
} }
func ParseBlock(block *common.Block) *Block { func ParseBlock(block *common.Block) *Block {
return &Block{block, nil} return &Block{block, []*Transaction{}}
} }
func (b *Block) Number() uint64 { func (b *Block) Number() (uint64, error) {
header := utils.AssertDefined(b.block.GetHeader(), "missing block header") header, err := utils.AssertDefined(b.block.GetHeader(), "missing block header")
return header.GetNumber() if err != nil {
return 0, fmt.Errorf("in Number: %w", err)
}
return header.GetNumber(), nil
} }
func (b *Block) Transactions() []*Transaction { func (b *Block) Transactions() ([]*Transaction, error) {
return utils.Cache(func() []*Transaction { return utils.Cache(func() ([]*Transaction, error) {
envelopes := b.unmarshalEnvelopesFromBlockData() funcName := "Transactions"
envelopes, err := b.unmarshalEnvelopesFromBlockData()
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
commonPayloads := b.unmarshalPayloadsFrom(envelopes) commonPayloads, err := b.unmarshalPayloadsFrom(envelopes)
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
payloads := b.parse(commonPayloads) payloads, err := b.parse(commonPayloads)
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
return b.createTransactionsFrom(payloads) return b.createTransactionsFrom(payloads), nil
})() })()
} }
func (b *Block) unmarshalEnvelopesFromBlockData() []*common.Envelope { func (b *Block) unmarshalEnvelopesFromBlockData() ([]*common.Envelope, error) {
result := []*common.Envelope{} result := []*common.Envelope{}
for _, blockData := range b.block.GetData().GetData() { for _, blockData := range b.block.GetData().GetData() {
envelope := &common.Envelope{} envelope := &common.Envelope{}
if err := proto.Unmarshal(blockData, envelope); err != nil { if err := proto.Unmarshal(blockData, envelope); err != nil {
panic(err) return nil, fmt.Errorf("in unmarshalEnvelopesFromBlockData: %w", err)
} }
result = append(result, envelope) result = append(result, envelope)
} }
return result return result, nil
} }
func (*Block) unmarshalPayloadsFrom(envelopes []*common.Envelope) []*common.Payload { func (*Block) unmarshalPayloadsFrom(envelopes []*common.Envelope) ([]*common.Payload, error) {
result := []*common.Payload{} result := []*common.Payload{}
for _, envelope := range envelopes { for _, envelope := range envelopes {
commonPayload := &common.Payload{} commonPayload := &common.Payload{}
if err := proto.Unmarshal(envelope.GetPayload(), commonPayload); err != nil { if err := proto.Unmarshal(envelope.GetPayload(), commonPayload); err != nil {
panic(err) return nil, fmt.Errorf("in unmarshalPayloadsFrom: %w", err)
} }
result = append(result, commonPayload) result = append(result, commonPayload)
} }
return result return result, nil
}
func (b *Block) parse(commonPayloads []*common.Payload) ([]*payload, error) {
funcName := "parse"
validationCodes, err := b.extractTransactionValidationCodes()
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
} }
func (b *Block) parse(commonPayloads []*common.Payload) []*payload {
validationCodes := b.extractTransactionValidationCodes()
result := []*payload{} result := []*payload{}
for i, commonPayload := range commonPayloads { for i, commonPayload := range commonPayloads {
payload := parsePayload( statusCode, err := utils.AssertDefined(
commonPayload,
int32(utils.AssertDefined(
validationCodes[i], validationCodes[i],
fmt.Sprint("missing validation code index", i), fmt.Sprint("missing validation code index", i),
),
),
) )
if payload.isEndorserTransaction() { if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
payload := parsePayload(commonPayload, int32(statusCode))
is, err := payload.isEndorserTransaction()
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
if is {
result = append(result, payload) result = append(result, payload)
} }
} }
return result
return result, nil
} }
func (b *Block) extractTransactionValidationCodes() []byte { func (b *Block) extractTransactionValidationCodes() ([]byte, error) {
metadata := utils.AssertDefined( metadata, err := utils.AssertDefined(
b.block.GetMetadata(), b.block.GetMetadata(),
"missing block metadata", "missing block metadata",
) )
if err != nil {
return nil, fmt.Errorf("in extractTransactionValidationCodes: %w", err)
}
return utils.AssertDefined( return utils.AssertDefined(
metadata.GetMetadata()[common.BlockMetadataIndex_TRANSACTIONS_FILTER], metadata.GetMetadata()[common.BlockMetadataIndex_TRANSACTIONS_FILTER],

View file

@ -35,12 +35,17 @@ func Test_GetReadWriteSetsFromEndorserTransaction(t *testing.T) {
} }
parsedEndorserTransaction := parseEndorserTransaction(transaction) parsedEndorserTransaction := parseEndorserTransaction(transaction)
if len(parsedEndorserTransaction.readWriteSets()) != 1 { readWriteSets, err := parsedEndorserTransaction.readWriteSets()
t.Fatal("expected 1 ReadWriteSet, got", len(parsedEndorserTransaction.readWriteSets())) if err != nil {
t.Fatal("unexpected error:", err)
}
if len(readWriteSets) != 1 {
t.Fatal("expected 1 ReadWriteSet, got", len(readWriteSets))
} }
assertReadWriteSet( assertReadWriteSet(
parsedEndorserTransaction.readWriteSets()[0].namespaceReadWriteSets()[0], readWriteSets[0].namespaceReadWriteSets()[0],
expectedNamespace, expectedNamespace,
expectedAsset, expectedAsset,
t, t,
@ -57,7 +62,10 @@ func assertReadWriteSet(
t.Errorf("expected namespace %s, got %s", expectedNamespace, parsedNsRwSet.Namespace()) t.Errorf("expected namespace %s, got %s", expectedNamespace, parsedNsRwSet.Namespace())
} }
actualKVRWSet := parsedNsRwSet.ReadWriteSet() actualKVRWSet, err := parsedNsRwSet.ReadWriteSet()
if err != nil {
t.Fatal("unexpected error:", err)
}
if len(actualKVRWSet.Writes) != 1 { if len(actualKVRWSet.Writes) != 1 {
t.Fatal("expected 1 write, got", len(actualKVRWSet.Writes)) t.Fatal("expected 1 write, got", len(actualKVRWSet.Writes))
} }

View file

@ -1,6 +1,7 @@
package parser package parser
import ( import (
"fmt"
"offChainData/utils" "offChainData/utils"
"github.com/hyperledger/fabric-protos-go-apiv2/ledger/rwset" "github.com/hyperledger/fabric-protos-go-apiv2/ledger/rwset"
@ -16,83 +17,104 @@ func parseEndorserTransaction(transaction *peer.Transaction) *endorserTransactio
return &endorserTransaction{transaction} return &endorserTransaction{transaction}
} }
func (p *endorserTransaction) readWriteSets() []*readWriteSet { func (p *endorserTransaction) readWriteSets() ([]*readWriteSet, error) {
return utils.Cache(func() []*readWriteSet { return utils.Cache(func() ([]*readWriteSet, error) {
chaincodeActionPayloads := p.unmarshalChaincodeActionPayloads() funcName := "readWriteSets"
chaincodeActionPayloads, err := p.unmarshalChaincodeActionPayloads()
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
chaincodeEndorsedActions := p.extractChaincodeEndorsedActionsFrom(chaincodeActionPayloads) chaincodeEndorsedActions, err := p.extractChaincodeEndorsedActionsFrom(chaincodeActionPayloads)
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
proposalResponsePayloads := p.unmarshalProposalResponsePayloadsFrom(chaincodeEndorsedActions) proposalResponsePayloads, err := p.unmarshalProposalResponsePayloadsFrom(chaincodeEndorsedActions)
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
chaincodeActions := p.unmarshalChaincodeActionsFrom(proposalResponsePayloads) chaincodeActions, err := p.unmarshalChaincodeActionsFrom(proposalResponsePayloads)
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
txReadWriteSets := p.unmarshalTxReadWriteSetsFrom(chaincodeActions) txReadWriteSets, err := p.unmarshalTxReadWriteSetsFrom(chaincodeActions)
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
return p.parseReadWriteSets(txReadWriteSets) return p.parseReadWriteSets(txReadWriteSets), nil
})() })()
} }
func (p *endorserTransaction) unmarshalChaincodeActionPayloads() []*peer.ChaincodeActionPayload { func (p *endorserTransaction) unmarshalChaincodeActionPayloads() ([]*peer.ChaincodeActionPayload, error) {
result := []*peer.ChaincodeActionPayload{} result := []*peer.ChaincodeActionPayload{}
for _, transactionAction := range p.transaction.GetActions() { for _, transactionAction := range p.transaction.GetActions() {
chaincodeActionPayload := &peer.ChaincodeActionPayload{} chaincodeActionPayload := &peer.ChaincodeActionPayload{}
if err := proto.Unmarshal(transactionAction.GetPayload(), chaincodeActionPayload); err != nil { if err := proto.Unmarshal(transactionAction.GetPayload(), chaincodeActionPayload); err != nil {
panic(err) return nil, fmt.Errorf("in unmarshalChaincodeActionPayloads: %w", err)
} }
result = append(result, chaincodeActionPayload) result = append(result, chaincodeActionPayload)
} }
return result return result, nil
} }
func (*endorserTransaction) extractChaincodeEndorsedActionsFrom(chaincodeActionPayloads []*peer.ChaincodeActionPayload) []*peer.ChaincodeEndorsedAction { func (*endorserTransaction) extractChaincodeEndorsedActionsFrom(chaincodeActionPayloads []*peer.ChaincodeActionPayload) ([]*peer.ChaincodeEndorsedAction, error) {
result := []*peer.ChaincodeEndorsedAction{} result := []*peer.ChaincodeEndorsedAction{}
for _, payload := range chaincodeActionPayloads { for _, payload := range chaincodeActionPayloads {
result = append( chaincodeEndorsedAction, err := utils.AssertDefined(
result,
utils.AssertDefined(
payload.GetAction(), payload.GetAction(),
"missing chaincode endorsed action", "missing chaincode endorsed action",
),
) )
} if err != nil {
return result return nil, fmt.Errorf("in extractChaincodeEndorsedActionsFrom: %w", err)
} }
func (*endorserTransaction) unmarshalProposalResponsePayloadsFrom(chaincodeEndorsedActions []*peer.ChaincodeEndorsedAction) []*peer.ProposalResponsePayload { result = append(
result,
chaincodeEndorsedAction,
)
}
return result, nil
}
func (*endorserTransaction) unmarshalProposalResponsePayloadsFrom(chaincodeEndorsedActions []*peer.ChaincodeEndorsedAction) ([]*peer.ProposalResponsePayload, error) {
result := []*peer.ProposalResponsePayload{} result := []*peer.ProposalResponsePayload{}
for _, endorsedAction := range chaincodeEndorsedActions { for _, endorsedAction := range chaincodeEndorsedActions {
proposalResponsePayload := &peer.ProposalResponsePayload{} proposalResponsePayload := &peer.ProposalResponsePayload{}
if err := proto.Unmarshal(endorsedAction.GetProposalResponsePayload(), proposalResponsePayload); err != nil { if err := proto.Unmarshal(endorsedAction.GetProposalResponsePayload(), proposalResponsePayload); err != nil {
panic(err) return nil, fmt.Errorf("in unmarshalProposalResponsePayloadsFrom: %w", err)
} }
result = append(result, proposalResponsePayload) result = append(result, proposalResponsePayload)
} }
return result return result, nil
} }
func (*endorserTransaction) unmarshalChaincodeActionsFrom(proposalResponsePayloads []*peer.ProposalResponsePayload) []*peer.ChaincodeAction { func (*endorserTransaction) unmarshalChaincodeActionsFrom(proposalResponsePayloads []*peer.ProposalResponsePayload) ([]*peer.ChaincodeAction, error) {
result := []*peer.ChaincodeAction{} result := []*peer.ChaincodeAction{}
for _, proposalResponsePayload := range proposalResponsePayloads { for _, proposalResponsePayload := range proposalResponsePayloads {
chaincodeAction := &peer.ChaincodeAction{} chaincodeAction := &peer.ChaincodeAction{}
if err := proto.Unmarshal(proposalResponsePayload.GetExtension(), chaincodeAction); err != nil { if err := proto.Unmarshal(proposalResponsePayload.GetExtension(), chaincodeAction); err != nil {
panic(err) return nil, fmt.Errorf("in unmarshalChaincodeActionsFrom: %w", err)
} }
result = append(result, chaincodeAction) result = append(result, chaincodeAction)
} }
return result return result, nil
} }
func (*endorserTransaction) unmarshalTxReadWriteSetsFrom(chaincodeActions []*peer.ChaincodeAction) []*rwset.TxReadWriteSet { func (*endorserTransaction) unmarshalTxReadWriteSetsFrom(chaincodeActions []*peer.ChaincodeAction) ([]*rwset.TxReadWriteSet, error) {
result := []*rwset.TxReadWriteSet{} result := []*rwset.TxReadWriteSet{}
for _, chaincodeAction := range chaincodeActions { for _, chaincodeAction := range chaincodeActions {
txReadWriteSet := &rwset.TxReadWriteSet{} txReadWriteSet := &rwset.TxReadWriteSet{}
if err := proto.Unmarshal(chaincodeAction.GetResults(), txReadWriteSet); err != nil { if err := proto.Unmarshal(chaincodeAction.GetResults(), txReadWriteSet); err != nil {
continue return nil, fmt.Errorf("in unmarshalTxReadWriteSetsFrom: %w", err)
} }
result = append(result, txReadWriteSet) result = append(result, txReadWriteSet)
} }
return result return result, nil
} }
func (*endorserTransaction) parseReadWriteSets(txReadWriteSets []*rwset.TxReadWriteSet) []*readWriteSet { func (*endorserTransaction) parseReadWriteSets(txReadWriteSets []*rwset.TxReadWriteSet) []*readWriteSet {

View file

@ -1,6 +1,7 @@
package parser package parser
import ( import (
"fmt"
"offChainData/utils" "offChainData/utils"
"github.com/hyperledger/fabric-protos-go-apiv2/ledger/rwset" "github.com/hyperledger/fabric-protos-go-apiv2/ledger/rwset"
@ -20,13 +21,13 @@ func (p *NamespaceReadWriteSet) Namespace() string {
return p.nsReadWriteSet.GetNamespace() return p.nsReadWriteSet.GetNamespace()
} }
func (p *NamespaceReadWriteSet) ReadWriteSet() *kvrwset.KVRWSet { func (p *NamespaceReadWriteSet) ReadWriteSet() (*kvrwset.KVRWSet, error) {
return utils.Cache(func() *kvrwset.KVRWSet { return utils.Cache(func() (*kvrwset.KVRWSet, error) {
result := kvrwset.KVRWSet{} result := kvrwset.KVRWSet{}
if err := proto.Unmarshal(p.nsReadWriteSet.GetRwset(), &result); err != nil { if err := proto.Unmarshal(p.nsReadWriteSet.GetRwset(), &result); err != nil {
panic(err) return nil, fmt.Errorf("in ReadWriteSet: %w", err)
} }
return &result return &result, nil
})() })()
} }

View file

@ -18,34 +18,54 @@ func parsePayload(commonPayload *common.Payload, statusCode int32) *payload {
return &payload{commonPayload, statusCode} return &payload{commonPayload, statusCode}
} }
func (p *payload) channelHeader() *common.ChannelHeader { func (p *payload) channelHeader() (*common.ChannelHeader, error) {
return utils.Cache(func() *common.ChannelHeader { return utils.Cache(func() (*common.ChannelHeader, error) {
header := utils.AssertDefined(p.commonPayload.GetHeader(), "missing payload header") funcName := "channelHeader"
header, err := utils.AssertDefined(p.commonPayload.GetHeader(), "missing payload header")
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
result := &common.ChannelHeader{} result := &common.ChannelHeader{}
if err := proto.Unmarshal(header.GetChannelHeader(), result); err != nil { if err := proto.Unmarshal(header.GetChannelHeader(), result); err != nil {
panic(err) return nil, fmt.Errorf("in %s: %w", funcName, err)
} }
return result return result, nil
})() })()
} }
func (p *payload) endorserTransaction() *endorserTransaction { func (p *payload) endorserTransaction() (*endorserTransaction, error) {
if !p.isEndorserTransaction() { funcName := "endorserTransaction"
panic(fmt.Errorf("unexpected payload type: %d", p.channelHeader().GetType()))
is, err := p.isEndorserTransaction()
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
if !is {
channelHeader, err := p.channelHeader()
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
return nil, fmt.Errorf("unexpected payload type: %d", channelHeader.GetType())
} }
result := &peer.Transaction{} result := &peer.Transaction{}
if err := proto.Unmarshal(p.commonPayload.GetData(), result); err != nil { if err := proto.Unmarshal(p.commonPayload.GetData(), result); err != nil {
panic(err) return nil, fmt.Errorf("in %s: %w", funcName, err)
} }
return parseEndorserTransaction(result) return parseEndorserTransaction(result), nil
} }
func (p *payload) isEndorserTransaction() bool { func (p *payload) isEndorserTransaction() (bool, error) {
return p.channelHeader().GetType() == int32(common.HeaderType_ENDORSER_TRANSACTION) channelHeader, err := p.channelHeader()
if err != nil {
return false, fmt.Errorf("in isEndorserTransaction: %w", err)
}
return channelHeader.GetType() == int32(common.HeaderType_ENDORSER_TRANSACTION), nil
} }
func (p *payload) isValid() bool { func (p *payload) isValid() bool {

View file

@ -1,6 +1,8 @@
package parser package parser
import ( import (
"fmt"
"github.com/hyperledger/fabric-protos-go-apiv2/common" "github.com/hyperledger/fabric-protos-go-apiv2/common"
) )
@ -12,17 +14,29 @@ func newTransaction(payload *payload) *Transaction {
return &Transaction{payload} return &Transaction{payload}
} }
func (t *Transaction) ChannelHeader() *common.ChannelHeader { func (t *Transaction) ChannelHeader() (*common.ChannelHeader, error) {
return t.payload.channelHeader() return t.payload.channelHeader()
} }
func (t *Transaction) NamespaceReadWriteSets() []*NamespaceReadWriteSet { func (t *Transaction) NamespaceReadWriteSets() ([]*NamespaceReadWriteSet, error) {
funcName := "NamespaceReadWriteSets"
endorserTransaction, err := t.payload.endorserTransaction()
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
txReadWriteSets, err := endorserTransaction.readWriteSets()
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
result := []*NamespaceReadWriteSet{} result := []*NamespaceReadWriteSet{}
for _, readWriteSet := range t.payload.endorserTransaction().readWriteSets() { for _, readWriteSet := range txReadWriteSets {
result = append(result, readWriteSet.namespaceReadWriteSets()...) result = append(result, readWriteSet.namespaceReadWriteSets()...)
} }
return result return result, nil
} }
func (t *Transaction) IsValid() bool { func (t *Transaction) IsValid() bool {

View file

@ -30,11 +30,21 @@ func NewBlock(
} }
func (b *block) Process() error { func (b *block) Process() error {
blockNumber := b.parsedBlock.Number() funcName := "Process"
blockNumber, err := b.parsedBlock.Number()
if err != nil {
return fmt.Errorf("in %s: %w", funcName, err)
}
fmt.Println("\nReceived block", blockNumber) fmt.Println("\nReceived block", blockNumber)
for _, validTransaction := range b.validTransactions() { validTransactions, err := b.validTransactions()
if err != nil {
return fmt.Errorf("in %s: %w", funcName, err)
}
for _, validTransaction := range validTransactions {
aTransaction := transaction{ aTransaction := transaction{
blockNumber, blockNumber,
validTransaction, validTransaction,
@ -43,46 +53,74 @@ func (b *block) Process() error {
b.channelName, b.channelName,
} }
if err := aTransaction.process(); err != nil { if err := aTransaction.process(); err != nil {
return err return fmt.Errorf("in %s: %w", funcName, err)
} }
transactionId := validTransaction.ChannelHeader().GetTxId() channelHeader, err := validTransaction.ChannelHeader()
if err != nil {
return fmt.Errorf("in %s: %w", funcName, err)
}
transactionId := channelHeader.GetTxId()
b.checkpointer.CheckpointTransaction(blockNumber, transactionId) b.checkpointer.CheckpointTransaction(blockNumber, transactionId)
} }
b.checkpointer.CheckpointBlock(b.parsedBlock.Number()) b.checkpointer.CheckpointBlock(blockNumber)
return nil return nil
} }
func (b *block) validTransactions() []*parser.Transaction { func (b *block) validTransactions() ([]*parser.Transaction, error) {
result := []*parser.Transaction{} result := []*parser.Transaction{}
for _, transaction := range b.getNewTransactions() { newTransactions, err := b.getNewTransactions()
if err != nil {
return nil, fmt.Errorf("in validTransactions: %w", err)
}
for _, transaction := range newTransactions {
if transaction.IsValid() { if transaction.IsValid() {
result = append(result, transaction) result = append(result, transaction)
} }
} }
return result return result, nil
} }
func (b *block) getNewTransactions() []*parser.Transaction { func (b *block) getNewTransactions() ([]*parser.Transaction, error) {
transactions := b.parsedBlock.Transactions() funcName := "getNewTransactions"
transactions, err := b.parsedBlock.Transactions()
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
lastTransactionId := b.checkpointer.TransactionID() lastTransactionId := b.checkpointer.TransactionID()
if lastTransactionId == "" { if lastTransactionId == "" {
// No previously processed transactions within this block so all are new // No previously processed transactions within this block so all are new
return transactions return transactions, nil
} }
// Ignore transactions up to the last processed transaction ID // Ignore transactions up to the last processed transaction ID
lastProcessedIndex := b.findLastProcessedIndex() lastProcessedIndex, err := b.findLastProcessedIndex()
return transactions[lastProcessedIndex+1:] if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
return transactions[lastProcessedIndex+1:], nil
}
func (b *block) findLastProcessedIndex() (int, error) {
funcName := "findLastProcessedIndex"
transactions, err := b.parsedBlock.Transactions()
if err != nil {
return 0, fmt.Errorf("in %s: %w", funcName, err)
} }
func (b *block) findLastProcessedIndex() int {
blockTransactionIds := []string{} blockTransactionIds := []string{}
for _, transaction := range b.parsedBlock.Transactions() { for _, transaction := range transactions {
blockTransactionIds = append(blockTransactionIds, transaction.ChannelHeader().GetTxId()) channelHeader, err := transaction.ChannelHeader()
if err != nil {
return 0, fmt.Errorf("in %s: %w", funcName, err)
}
blockTransactionIds = append(blockTransactionIds, channelHeader.GetTxId())
} }
lastTransactionId := b.checkpointer.TransactionID() lastTransactionId := b.checkpointer.TransactionID()
@ -92,27 +130,18 @@ func (b *block) findLastProcessedIndex() int {
lastProcessedIndex = index lastProcessedIndex = index
} }
} }
if lastProcessedIndex < 0 { if lastProcessedIndex < 0 {
panic( blockNumber, err := b.parsedBlock.Number()
fmt.Errorf( if err != nil {
"checkpoint transaction ID %s not found in block %d containing transactions: %s", return 0, fmt.Errorf("in %s: %w", funcName, err)
}
return lastProcessedIndex, newTxIdNotFoundError(
lastTransactionId, lastTransactionId,
b.parsedBlock.Number(), blockNumber,
b.joinByComma(blockTransactionIds), blockTransactionIds,
),
) )
} }
return lastProcessedIndex
}
func (b *block) joinByComma(list []string) string { return lastProcessedIndex, nil
result := ""
for index, item := range list {
if len(list)-1 == index {
result += item
} else {
result += item + ", "
}
}
return result
} }

View file

@ -15,9 +15,19 @@ type transaction struct {
} }
func (t *transaction) process() error { func (t *transaction) process() error {
transactionId := t.transaction.ChannelHeader().GetTxId() funcName := "process"
channelHeader, err := t.transaction.ChannelHeader()
if err != nil {
return fmt.Errorf("in %s: %w", funcName, err)
}
transactionId := channelHeader.GetTxId()
writes, err := t.writes()
if err != nil {
return fmt.Errorf("in %s: %w", funcName, err)
}
writes := t.writes()
if len(writes) == 0 { if len(writes) == 0 {
fmt.Println("Skipping read-only or system transaction", transactionId) fmt.Println("Skipping read-only or system transaction", transactionId)
return nil return nil
@ -30,18 +40,29 @@ func (t *transaction) process() error {
TransactionId: transactionId, TransactionId: transactionId,
Writes: writes, Writes: writes,
}); err != nil { }); err != nil {
return err return fmt.Errorf("in %s: %w", funcName, err)
} }
return nil return nil
} }
func (t *transaction) writes() []store.Write { func (t *transaction) writes() ([]store.Write, error) {
funcName := "writes"
// TODO this entire code should live in the parser and just return the kvWrite which // TODO this entire code should live in the parser and just return the kvWrite which
// we then map to store.Write and return // we then map to store.Write and return
t.channelName = t.transaction.ChannelHeader().GetChannelId() channelHeader, err := t.transaction.ChannelHeader()
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
t.channelName = channelHeader.GetChannelId()
nsReadWriteSets, err := t.transaction.NamespaceReadWriteSets()
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
nonSystemCCReadWriteSets := []*parser.NamespaceReadWriteSet{} nonSystemCCReadWriteSets := []*parser.NamespaceReadWriteSet{}
for _, nsReadWriteSet := range t.transaction.NamespaceReadWriteSets() { for _, nsReadWriteSet := range nsReadWriteSets {
if !t.isSystemChaincode(nsReadWriteSet.Namespace()) { if !t.isSystemChaincode(nsReadWriteSet.Namespace()) {
nonSystemCCReadWriteSets = append(nonSystemCCReadWriteSets, nsReadWriteSet) nonSystemCCReadWriteSets = append(nonSystemCCReadWriteSets, nsReadWriteSet)
} }
@ -51,7 +72,12 @@ func (t *transaction) writes() []store.Write {
for _, readWriteSet := range nonSystemCCReadWriteSets { for _, readWriteSet := range nonSystemCCReadWriteSets {
namespace := readWriteSet.Namespace() namespace := readWriteSet.Namespace()
for _, kvWrite := range readWriteSet.ReadWriteSet().GetWrites() { kvReadWriteSet, err := readWriteSet.ReadWriteSet()
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
for _, kvWrite := range kvReadWriteSet.GetWrites() {
writes = append(writes, store.Write{ writes = append(writes, store.Write{
ChannelName: t.channelName, ChannelName: t.channelName,
Namespace: namespace, Namespace: namespace,
@ -62,7 +88,7 @@ func (t *transaction) writes() []store.Write {
} }
} }
return writes return writes, nil
} }
func (t *transaction) isSystemChaincode(chaincodeName string) bool { func (t *transaction) isSystemChaincode(chaincodeName string) bool {

View file

@ -0,0 +1,32 @@
package processor
import "fmt"
type txIdNotFoundError struct {
txId string
blockNumber uint64
blockTxIds []string
}
func newTxIdNotFoundError(txId string, blockNumber uint64, blockTxIds []string) *txIdNotFoundError {
return &txIdNotFoundError{
txId, blockNumber, blockTxIds,
}
}
func (t *txIdNotFoundError) Error() string {
format := "checkpoint transaction ID %s not found in block %d containing transactions: %s"
return fmt.Sprintf(format, t.txId, t.blockNumber, t.blockTxIdsJoinedByComma())
}
func (t *txIdNotFoundError) blockTxIdsJoinedByComma() string {
result := ""
for index, item := range t.blockTxIds {
if len(t.blockTxIds)-1 == index {
result += item
} else {
result += item + ", "
}
}
return result
}

View file

@ -14,20 +14,21 @@ import (
var storeFile = utils.EnvOrDefault("STORE_FILE", "store.log") var storeFile = utils.EnvOrDefault("STORE_FILE", "store.log")
var SimulatedFailureCount = getSimulatedFailureCount() var SimulatedFailureCount = getSimulatedFailureCount()
var transactionCount uint = 0 // Used only to simulate failures var transactionCount uint = 0 // Used only to simulate failures
var ErrExpected = errors.New("[expected error]: simulated write failure")
// Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance. // Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance.
// This implementation just writes to a file. // This implementation just writes to a file.
func ApplyWritesToOffChainStore(data LedgerUpdate) error { func ApplyWritesToOffChainStore(data LedgerUpdate) error {
funcName := "ApplyWritesToOffChainStore"
if err := simulateFailureIfRequired(); err != nil { if err := simulateFailureIfRequired(); err != nil {
return err return fmt.Errorf("in %s: %w", funcName, err)
} }
writes := []string{} writes := []string{}
for _, write := range data.Writes { for _, write := range data.Writes {
marshaled, err := json.Marshal(write) marshaled, err := json.Marshal(write)
if err != nil { if err != nil {
panic(err) return fmt.Errorf("in %s: %w", funcName, err)
} }
writes = append(writes, string(marshaled)) writes = append(writes, string(marshaled))
@ -35,16 +36,16 @@ func ApplyWritesToOffChainStore(data LedgerUpdate) error {
f, err := os.OpenFile(storeFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) f, err := os.OpenFile(storeFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil { if err != nil {
panic(err) return fmt.Errorf("in %s: %w", funcName, err)
} }
if _, err := f.Write([]byte(strings.Join(writes, "\n") + "\n")); err != nil { if _, err := f.Write([]byte(strings.Join(writes, "\n") + "\n")); err != nil {
f.Close() f.Close()
panic(err) return fmt.Errorf("in %s: %w", funcName, err)
} }
if err := f.Close(); err != nil { if err := f.Close(); err != nil {
panic(err) return fmt.Errorf("in %s: %w", funcName, err)
} }
return nil return nil
@ -53,7 +54,7 @@ func ApplyWritesToOffChainStore(data LedgerUpdate) error {
func simulateFailureIfRequired() error { func simulateFailureIfRequired() error {
if SimulatedFailureCount > 0 && transactionCount >= SimulatedFailureCount { if SimulatedFailureCount > 0 && transactionCount >= SimulatedFailureCount {
transactionCount = 0 transactionCount = 0
return ErrExpected return errors.New("expected error: simulated write failure")
} }
transactionCount += 1 transactionCount += 1

View file

@ -17,7 +17,10 @@ func transact(clientConnection *grpc.ClientConn) {
if err != nil { if err != nil {
panic((err)) panic((err))
} }
defer gateway.Close() defer func() {
gateway.Close()
fmt.Println("Gateway closed.")
}()
contract := gateway.GetNetwork(channelName).GetContract(chaincodeName) contract := gateway.GetNetwork(channelName).GetContract(chaincodeName)
@ -42,29 +45,45 @@ func (t *transactApp) run() {
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
t.transact()
if err := t.transact(); err != nil {
fmt.Println("\033[31m[ERROR]\033[0m", err)
return
}
}() }()
} }
wg.Wait() wg.Wait()
} }
func (t *transactApp) transact() { func (t *transactApp) transact() error {
funcName := "transact"
anAsset := atb.NewAsset() anAsset := atb.NewAsset()
t.smartContract.CreateAsset(anAsset) err := t.smartContract.CreateAsset(anAsset)
if err != nil {
return fmt.Errorf("in %s: %w", funcName, err)
}
fmt.Println("Created asset", anAsset.ID) fmt.Println("Created asset", anAsset.ID)
// Transfer randomly 1 in 2 assets to a new owner. // Transfer randomly 1 in 2 assets to a new owner.
if utils.RandomInt(2) == 0 { if utils.RandomInt(2) == 0 {
newOwner := utils.DifferentElement(atb.Owners, anAsset.Owner) newOwner := utils.DifferentElement(atb.Owners, anAsset.Owner)
oldOwner := t.smartContract.TransferAsset(anAsset.ID, newOwner) oldOwner, err := t.smartContract.TransferAsset(anAsset.ID, newOwner)
if err != nil {
return fmt.Errorf("in %s: %w", funcName, err)
}
fmt.Printf("Transferred asset %s from %s to %s\n", anAsset.ID, oldOwner, newOwner) fmt.Printf("Transferred asset %s from %s to %s\n", anAsset.ID, oldOwner, newOwner)
} }
// Delete randomly 1 in 4 created assets. // Delete randomly 1 in 4 created assets.
if utils.RandomInt(4) == 0 { if utils.RandomInt(4) == 0 {
t.smartContract.DeleteAsset(anAsset.ID) err := t.smartContract.DeleteAsset(anAsset.ID)
if err != nil {
return fmt.Errorf("in %s: %w", funcName, err)
}
fmt.Println("Deleted asset", anAsset.ID) fmt.Println("Deleted asset", anAsset.ID)
} }
return nil
} }

View file

@ -3,6 +3,7 @@ package utils
import ( import (
"crypto/rand" "crypto/rand"
"errors" "errors"
"fmt"
"math/big" "math/big"
"os" "os"
) )
@ -35,25 +36,32 @@ func DifferentElement(values []string, currentValue string) string {
} }
// Return the value if it is defined; otherwise panics with an error message. // Return the value if it is defined; otherwise panics with an error message.
func AssertDefined[T any](value T, message string) T { func AssertDefined[T any](value T, message string) (T, error) {
if any(value) == any(nil) { if any(value) == any(nil) {
panic(errors.New(message)) var zeroValue T
return zeroValue, errors.New(message)
} }
return value return value, nil
} }
// Wrap a function call with a cache. On first call the wrapped function is invoked to // Wrap a function call with a cache. On first call the wrapped function is invoked to
// obtain a result. Subsequent calls return the cached result. // obtain a result. Subsequent calls return the cached result.
func Cache[T any](f func() T) func() T { func Cache[T any](f func() (T, error)) func() (T, error) {
value := any(nil) var value T
var err error
var cached bool
return func() T { return func() (T, error) {
if value == nil { if !cached {
value = f() value, err = f()
if err != nil {
var zeroValue T
return zeroValue, fmt.Errorf("in Cache: %w", err)
} }
cached = true
return value.(T) }
return value, nil
} }
} }

View file

@ -1,20 +1,27 @@
package utils_test package utils_test
import ( import (
"errors"
"offChainData/utils" "offChainData/utils"
"testing" "testing"
) )
func Test_cachePrimitiveFunctionResult(t *testing.T) { func Test_cachePrimitiveFunctionResult(t *testing.T) {
counter := 0 counter := 0
f := func() int { f := func() (int, error) {
counter++ counter++
return 5 return 5, nil
} }
cachedFunc := utils.Cache(f) cachedFunc := utils.Cache(f)
result1 := cachedFunc() result1, err := cachedFunc()
result2 := cachedFunc() if err != nil {
t.Fatal("unexpected error:", err)
}
result2, err := cachedFunc()
if err != nil {
t.Fatal("unexpected error:", err)
}
if counter != 1 { if counter != 1 {
t.Error("expected counter to be 1, but got", counter) t.Error("expected counter to be 1, but got", counter)
@ -25,13 +32,36 @@ func Test_cachePrimitiveFunctionResult(t *testing.T) {
} }
} }
func Test_whenCachedFunctionsErrors_returnError(t *testing.T) {
errorMsg := "error"
f := func() (int, error) {
return 0, errors.New(errorMsg)
}
cachedFunc := utils.Cache(f)
_, err := cachedFunc()
if err == nil {
t.Fatal("expected error, but got", err)
}
if err.Error() != errorMsg {
t.Fatal("expected error message to be 'error', but got", err)
}
}
func Test_cacheWrappedPrimitiveFunctionResult(t *testing.T) { func Test_cacheWrappedPrimitiveFunctionResult(t *testing.T) {
controlValue := 5 controlValue := 5
multiplyControlValueBy := func(n int) int { controlValue *= n; return controlValue } multiplyControlValueBy := func(n int) (int, error) { controlValue *= n; return controlValue, nil }
cachedFunc := utils.Cache(func() int { return multiplyControlValueBy(5) }) cachedFunc := utils.Cache(func() (int, error) { return multiplyControlValueBy(5) })
result1 := cachedFunc() result1, err := cachedFunc()
result2 := cachedFunc() if err != nil {
t.Fatal("unexpected error:", err)
}
result2, err := cachedFunc()
if err != nil {
t.Fatal("unexpected error:", err)
}
if controlValue != 25 { if controlValue != 25 {
t.Error("expected control value to be 25, but got", controlValue) t.Error("expected control value to be 25, but got", controlValue)
@ -48,17 +78,23 @@ func Test_cacheWrappedDataStructureResult(t *testing.T) {
} }
controlStruct := &GreetMe{helloTo: "Hello "} controlStruct := &GreetMe{helloTo: "Hello "}
greet := func(name string) *GreetMe { controlStruct.helloTo += name; return controlStruct } greet := func(name string) (*GreetMe, error) { controlStruct.helloTo += name; return controlStruct, nil }
cachedFunc := utils.Cache(func() *GreetMe { return greet("John Doe") }) cachedFunc := utils.Cache(func() (*GreetMe, error) { return greet("John Doe") })
result1 := cachedFunc().helloTo result1, err := cachedFunc()
result2 := cachedFunc().helloTo if err != nil {
t.Fatal("unexpected error:", err)
}
result2, err := cachedFunc()
if err != nil {
t.Fatal("unexpected error:", err)
}
if controlStruct.helloTo != "Hello John Doe" { if controlStruct.helloTo != "Hello John Doe" {
t.Error("expected control value to be 'Hello John Doe', but got", controlStruct) t.Error("expected control value to be 'Hello John Doe', but got", controlStruct)
} }
if result1 != "Hello John Doe" || result2 != "Hello John Doe" { if result1.helloTo != "Hello John Doe" || result2.helloTo != "Hello John Doe" {
t.Fatal("expected cached results to be 'Hello John Doe', but got", result1, result2) t.Fatal("expected cached results to be 'Hello John Doe', but got", result1.helloTo, result2.helloTo)
} }
} }