fabric-samples/off_chain_data/application-go/processor/transaction.go
Stanislav Jakuschevskij 06c7445e91
Replace panic with error handling
Starting from the processor.Block.Process all methods now return errors
if something goes wrong with unpacking of the blocks and reading the
transactions. In each function where the error is being propagated back
to client it is wrapped in a message with the function name. This makes
it easier to track down the error and see the propagation chain. Finally
the error is logged to the terminal and the go routine shuts down
gracefully. The graceful shutdown executes all deferred functions which
close the context, the checkpointer and the gateway.

Before panics were used everywhere which was an issue because the
unpacking of the blocks happened in a go routine. When a panic happens
in a go routine only the deferred functions of the go routine are called
but not those of the client which lead to unexpected behavior.

The transact function is also executed in a go routine therefore the
same typo of error handling was implemented there.

Signed-off-by: Stanislav Jakuschevskij <stas@two-giants.com>
2025-02-24 13:14:48 +01:00

104 lines
2.5 KiB
Go

package processor
import (
"fmt"
"offChainData/parser"
"offChainData/store"
"slices"
)
type transaction struct {
blockNumber uint64
transaction *parser.Transaction
writeToStore store.Writer
channelName string
}
func (t *transaction) process() error {
funcName := "process"
channelHeader, err := t.transaction.ChannelHeader()
if err != nil {
return fmt.Errorf("in %s: %w", funcName, err)
}
transactionId := channelHeader.GetTxId()
writes, err := t.writes()
if err != nil {
return fmt.Errorf("in %s: %w", funcName, err)
}
if len(writes) == 0 {
fmt.Println("Skipping read-only or system transaction", transactionId)
return nil
}
fmt.Println("Process transaction", transactionId)
if err := t.writeToStore(store.LedgerUpdate{
BlockNumber: t.blockNumber,
TransactionId: transactionId,
Writes: writes,
}); err != nil {
return fmt.Errorf("in %s: %w", funcName, err)
}
return nil
}
func (t *transaction) writes() ([]store.Write, error) {
funcName := "writes"
// TODO this entire code should live in the parser and just return the kvWrite which
// we then map to store.Write and return
channelHeader, err := t.transaction.ChannelHeader()
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
t.channelName = channelHeader.GetChannelId()
nsReadWriteSets, err := t.transaction.NamespaceReadWriteSets()
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
nonSystemCCReadWriteSets := []*parser.NamespaceReadWriteSet{}
for _, nsReadWriteSet := range nsReadWriteSets {
if !t.isSystemChaincode(nsReadWriteSet.Namespace()) {
nonSystemCCReadWriteSets = append(nonSystemCCReadWriteSets, nsReadWriteSet)
}
}
writes := []store.Write{}
for _, readWriteSet := range nonSystemCCReadWriteSets {
namespace := readWriteSet.Namespace()
kvReadWriteSet, err := readWriteSet.ReadWriteSet()
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
for _, kvWrite := range kvReadWriteSet.GetWrites() {
writes = append(writes, store.Write{
ChannelName: t.channelName,
Namespace: namespace,
Key: kvWrite.GetKey(),
IsDelete: kvWrite.GetIsDelete(),
Value: string(kvWrite.GetValue()), // Convert bytes to text, purely for readability in output
})
}
}
return writes, nil
}
func (t *transaction) isSystemChaincode(chaincodeName string) bool {
systemChaincodeNames := []string{
"_lifecycle",
"cscc",
"escc",
"lscc",
"qscc",
"vscc",
}
return slices.Contains(systemChaincodeNames, chaincodeName)
}