From b04bda5b1164f0ff7bf51b71a0435bc1d1cc23d0 Mon Sep 17 00:00:00 2001 From: Stanislav Jakuschevskij Date: Tue, 31 Dec 2024 12:51:52 +0100 Subject: [PATCH] Extract block processor and store from listener Created packages for the flat file store and the processor and moved functions, variables and constants from listener.go to those packages. Encapsulated everything not used outside the packages, introduced model.go files which later might be extracted into a model package and renamed parser/parsedBlock.go to parser/block.go. Signed-off-by: Stanislav Jakuschevskij --- off_chain_data/application-go/connect.go | 27 +- .../contract/{asset.go => model.go} | 0 off_chain_data/application-go/listen.go | 270 +----------------- .../parser/{blockParser.go => block.go} | 0 .../{blockParser_test.go => block_test.go} | 0 .../application-go/processor/block.go | 114 ++++++++ .../application-go/processor/transaction.go | 73 +++++ .../application-go/store/flatFille.go | 76 +++++ off_chain_data/application-go/store/model.go | 25 ++ off_chain_data/application-go/transact.go | 1 + off_chain_data/application-go/utils/utils.go | 9 + 11 files changed, 320 insertions(+), 275 deletions(-) rename off_chain_data/application-go/contract/{asset.go => model.go} (100%) rename off_chain_data/application-go/parser/{blockParser.go => block.go} (100%) rename off_chain_data/application-go/parser/{blockParser_test.go => block_test.go} (100%) create mode 100644 off_chain_data/application-go/processor/block.go create mode 100644 off_chain_data/application-go/processor/transaction.go create mode 100644 off_chain_data/application-go/store/flatFille.go create mode 100644 off_chain_data/application-go/store/model.go diff --git a/off_chain_data/application-go/connect.go b/off_chain_data/application-go/connect.go index 91a69676..728e9888 100644 --- a/off_chain_data/application-go/connect.go +++ b/off_chain_data/application-go/connect.go @@ -9,6 +9,7 @@ package main import ( "crypto/x509" "fmt" + "offChainData/utils" "os" "path" "time" @@ -23,37 +24,29 @@ import ( const peerName = "peer0.org1.example.com" var ( - channelName = envOrDefault("CHANNEL_NAME", "mychannel") - chaincodeName = envOrDefault("CHAINCODE_NAME", "basic") - mspID = envOrDefault("MSP_ID", "Org1MSP") + channelName = utils.EnvOrDefault("CHANNEL_NAME", "mychannel") + chaincodeName = utils.EnvOrDefault("CHAINCODE_NAME", "basic") + mspID = utils.EnvOrDefault("MSP_ID", "Org1MSP") // Path to crypto materials. - cryptoPath = envOrDefault("CRYPTO_PATH", "../../test-network/organizations/peerOrganizations/org1.example.com") + cryptoPath = utils.EnvOrDefault("CRYPTO_PATH", "../../test-network/organizations/peerOrganizations/org1.example.com") // Path to user private key directory. - keyDirectoryPath = envOrDefault("KEY_DIRECTORY_PATH", cryptoPath+"/users/User1@org1.example.com/msp/keystore") + keyDirectoryPath = utils.EnvOrDefault("KEY_DIRECTORY_PATH", cryptoPath+"/users/User1@org1.example.com/msp/keystore") // Path to user certificate. - certPath = envOrDefault("CERT_PATH", cryptoPath+"/users/User1@org1.example.com/msp/signcerts/cert.pem") + certPath = utils.EnvOrDefault("CERT_PATH", cryptoPath+"/users/User1@org1.example.com/msp/signcerts/cert.pem") // Path to peer tls certificate. - tlsCertPath = envOrDefault("TLS_CERT_PATH", cryptoPath+"/peers/peer0.org1.example.com/tls/ca.crt") + tlsCertPath = utils.EnvOrDefault("TLS_CERT_PATH", cryptoPath+"/peers/peer0.org1.example.com/tls/ca.crt") // Gateway peer endpoint. - peerEndpoint = envOrDefault("PEER_ENDPOINT", "dns:///localhost:7051") + peerEndpoint = utils.EnvOrDefault("PEER_ENDPOINT", "dns:///localhost:7051") // Gateway peer SSL host name override. - peerHostAlias = envOrDefault("PEER_HOST_ALIAS", peerName) + peerHostAlias = utils.EnvOrDefault("PEER_HOST_ALIAS", peerName) ) -func envOrDefault(key, defaultValue string) string { - result := os.Getenv(key) - if result == "" { - return defaultValue - } - return result -} - func newGrpcConnection() *grpc.ClientConn { certificatePEM, err := os.ReadFile(tlsCertPath) if err != nil { diff --git a/off_chain_data/application-go/contract/asset.go b/off_chain_data/application-go/contract/model.go similarity index 100% rename from off_chain_data/application-go/contract/asset.go rename to off_chain_data/application-go/contract/model.go diff --git a/off_chain_data/application-go/listen.go b/off_chain_data/application-go/listen.go index 80e6a268..4a974e78 100644 --- a/off_chain_data/application-go/listen.go +++ b/off_chain_data/application-go/listen.go @@ -2,96 +2,16 @@ package main import ( "context" - "encoding/json" - "errors" "fmt" - "math" "offChainData/parser" - "os" - "slices" - "strconv" - "strings" + "offChainData/processor" + "offChainData/store" + "offChainData/utils" "github.com/hyperledger/fabric-gateway/pkg/client" "google.golang.org/grpc" ) -var checkpointFile = envOrDefault("CHECKPOINT_FILE", "checkpoint.json") -var storeFile = envOrDefault("STORE_FILE", "store.log") -var simulatedFailureCount = getSimulatedFailureCount() - -const startBlock = 0 - -var transactionCount uint = 0 // Used only to simulate failures - -// Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance. -type store = func(data ledgerUpdate) - -// Ledger update made by a specific transaction. -type ledgerUpdate struct { - blockNumber uint64 - transactionId string - writes []write -} - -// Description of a ledger write that can be applied to an off-chain data store. -type write struct { - // Channel whose ledger is being updated. - ChannelName string `json:"channelName"` - // Namespace within the ledger. - Namespace string `json:"namespace"` - // Key name within the ledger namespace. - Key string `json:"key"` - // Whether the key and associated value are being deleted. - IsDelete bool `json:"isDelete"` - // If `isDelete` is false, the Value written to the key; otherwise ignored. - Value string `json:"value"` -} - -// Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance. -// This implementation just writes to a file. -var applyWritesToOffChainStore = func(data ledgerUpdate) { - if err := simulateFailureIfRequired(); err != nil { - fmt.Println("[expected error]: " + err.Error()) - return - } - - writes := []string{} - for _, write := range data.writes { - marshaled, err := json.Marshal(write) - if err != nil { - panic(err) - } - - writes = append(writes, string(marshaled)) - } - - f, err := os.OpenFile(storeFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - panic(err) - } - - if _, err := f.Write([]byte(strings.Join(writes, "\n") + "\n")); err != nil { - f.Close() - panic(err) - } - - if err := f.Close(); err != nil { - panic(err) - } -} - -func simulateFailureIfRequired() error { - if simulatedFailureCount > 0 && transactionCount >= simulatedFailureCount { - transactionCount = 0 - return errors.New("simulated write failure") - } - - transactionCount += 1 - - return nil -} - func listen(clientConnection *grpc.ClientConn) { id, options := newConnectOptions(clientConnection) gateway, err := client.Connect(id, options...) @@ -100,8 +20,7 @@ func listen(clientConnection *grpc.ClientConn) { } defer gateway.Close() - network := gateway.GetNetwork(channelName) - + checkpointFile := utils.EnvOrDefault("CHECKPOINT_FILE", "checkpoint.json") checkpointer, err := client.NewFileCheckpointer(checkpointFile) if err != nil { panic(err) @@ -110,196 +29,31 @@ func listen(clientConnection *grpc.ClientConn) { fmt.Println("Start event listening from block", checkpointer.BlockNumber()) fmt.Println("Last processed transaction ID within block:", checkpointer.TransactionID()) - if simulatedFailureCount > 0 { - fmt.Printf("Simulating a write failure every %d transactions\n", simulatedFailureCount) + if store.SimulatedFailureCount > 0 { + fmt.Printf("Simulating a write failure every %d transactions\n", store.SimulatedFailureCount) } // TODO put into infinite loop like in public docs example ctx, cancel := context.WithCancel(context.Background()) defer cancel() + network := gateway.GetNetwork(channelName) blocks, err := network.BlockEvents( ctx, client.WithCheckpoint(checkpointer), - client.WithStartBlock(startBlock), // Used only if there is no checkpoint block number + client.WithStartBlock(0), // Used only if there is no checkpoint block number ) if err != nil { panic(err) } for blockProto := range blocks { - aBlockProcessor := blockProcessor{ + blockProcessor := processor.NewBlock( parser.ParseBlock(blockProto), checkpointer, - applyWritesToOffChainStore, - } - aBlockProcessor.process() - } -} - -type blockProcessor struct { - block *parser.Block - checkpointer *client.FileCheckpointer - storeWrites store -} - -func (b *blockProcessor) process() { - blockNumber := b.block.Number() - - fmt.Println("Received block", blockNumber) - - for _, transaction := range b.validTransactions() { - aTransactionProcessor := transactionProcessor{ - blockNumber, - transaction, - b.storeWrites, - } - aTransactionProcessor.process() - - transactionId := transaction.ChannelHeader().GetTxId() - b.checkpointer.CheckpointTransaction(blockNumber, transactionId) - } - - b.checkpointer.CheckpointBlock(b.block.Number()) -} - -func (b blockProcessor) validTransactions() []*parser.Transaction { - result := []*parser.Transaction{} - for _, transaction := range b.getNewTransactions() { - if transaction.IsValid() { - result = append(result, transaction) - } - } - return result -} - -func (b *blockProcessor) getNewTransactions() []*parser.Transaction { - transactions := b.block.Transactions() - - lastTransactionId := b.checkpointer.TransactionID() - if lastTransactionId == "" { - // No previously processed transactions within this block so all are new - return transactions - } - - // Ignore transactions up to the last processed transaction ID - lastProcessedIndex := b.findLastProcessedIndex() - return transactions[lastProcessedIndex+1:] -} - -func (b blockProcessor) findLastProcessedIndex() int { - blockTransactionIds := []string{} - for _, transaction := range b.block.Transactions() { - blockTransactionIds = append(blockTransactionIds, transaction.ChannelHeader().GetTxId()) - } - - lastTransactionId := b.checkpointer.TransactionID() - lastProcessedIndex := -1 - for index, id := range blockTransactionIds { - if id == lastTransactionId { - lastProcessedIndex = index - } - } - if lastProcessedIndex < 0 { - panic( - fmt.Errorf( - "checkpoint transaction ID %s not found in block %d containing transactions: %s", - lastTransactionId, - b.block.Number(), - joinByComma(blockTransactionIds), - ), + store.ApplyWritesToOffChainStore, + channelName, ) + blockProcessor.Process() } - return lastProcessedIndex -} - -func joinByComma(list []string) string { - result := "" - for index, item := range list { - if len(list)-1 == index { - result += item - } else { - result += item + ", " - } - } - return result -} - -func getSimulatedFailureCount() uint { - valueAsString := envOrDefault("SIMULATED_FAILURE_COUNT", "0") - valueAsFloat, err := strconv.ParseFloat(valueAsString, 64) - if err != nil { - panic(err) - } - - result := math.Floor(valueAsFloat) - if valueAsFloat < 0 { - panic(fmt.Errorf("invalid SIMULATED_FAILURE_COUNT value: %s", valueAsString)) - } - - return uint(result) -} - -type transactionProcessor struct { - blockNumber uint64 - transaction *parser.Transaction - storeWrites store -} - -func (t *transactionProcessor) process() { - transactionId := t.transaction.ChannelHeader().GetTxId() - - writes := t.writes() - if len(writes) == 0 { - fmt.Println("Skipping read-only or system transaction", transactionId) - return - } - - fmt.Println("Process transaction", transactionId) - - t.storeWrites(ledgerUpdate{ - t.blockNumber, - transactionId, - writes, - }) -} - -func (t *transactionProcessor) writes() []write { - channelName = t.transaction.ChannelHeader().GetChannelId() - - nonSystemCCReadWriteSets := []parser.NamespaceReadWriteSet{} - for _, nsReadWriteSet := range t.transaction.NamespaceReadWriteSets() { - if !t.isSystemChaincode(nsReadWriteSet.Namespace()) { - nonSystemCCReadWriteSets = append(nonSystemCCReadWriteSets, nsReadWriteSet) - } - } - - writes := []write{} - for _, readWriteSet := range nonSystemCCReadWriteSets { - namespace := readWriteSet.Namespace() - - for _, kvWrite := range readWriteSet.ReadWriteSet().GetWrites() { - writes = append(writes, write{ - channelName, - namespace, - kvWrite.GetKey(), - kvWrite.GetIsDelete(), - string(kvWrite.GetValue()), // Convert bytes to text, purely for readability in output - }) - } - } - - return writes -} - -func (t *transactionProcessor) isSystemChaincode(chaincodeName string) bool { - systemChaincodeNames := []string{ - "_lifecycle", - "cscc", - "escc", - "lscc", - "qscc", - "vscc", - } - return slices.Contains(systemChaincodeNames, chaincodeName) } diff --git a/off_chain_data/application-go/parser/blockParser.go b/off_chain_data/application-go/parser/block.go similarity index 100% rename from off_chain_data/application-go/parser/blockParser.go rename to off_chain_data/application-go/parser/block.go diff --git a/off_chain_data/application-go/parser/blockParser_test.go b/off_chain_data/application-go/parser/block_test.go similarity index 100% rename from off_chain_data/application-go/parser/blockParser_test.go rename to off_chain_data/application-go/parser/block_test.go diff --git a/off_chain_data/application-go/processor/block.go b/off_chain_data/application-go/processor/block.go new file mode 100644 index 00000000..13cc0759 --- /dev/null +++ b/off_chain_data/application-go/processor/block.go @@ -0,0 +1,114 @@ +package processor + +import ( + "fmt" + "offChainData/parser" + "offChainData/store" + + "github.com/hyperledger/fabric-gateway/pkg/client" +) + +type block struct { + block *parser.Block + checkpointer *client.FileCheckpointer + writeToStore store.Writer + channelName string +} + +func NewBlock( + _block *parser.Block, + checkpointer *client.FileCheckpointer, + writeToStore store.Writer, + channelName string, +) *block { + return &block{ + _block, + checkpointer, + writeToStore, + channelName, + } +} + +func (b *block) Process() { + blockNumber := b.block.Number() + + fmt.Println("\nReceived block", blockNumber) + + for _, transaction := range b.validTransactions() { + aTransactionProcessor := transactionProcessor{ + blockNumber, + transaction, + // TODO use pointer to parent and get blockNumber, store and channelName from parent + b.writeToStore, + b.channelName, + } + aTransactionProcessor.process() + + transactionId := transaction.ChannelHeader().GetTxId() + b.checkpointer.CheckpointTransaction(blockNumber, transactionId) + } + + b.checkpointer.CheckpointBlock(b.block.Number()) +} + +func (b *block) validTransactions() []*parser.Transaction { + result := []*parser.Transaction{} + for _, transaction := range b.getNewTransactions() { + if transaction.IsValid() { + result = append(result, transaction) + } + } + return result +} + +func (b *block) getNewTransactions() []*parser.Transaction { + transactions := b.block.Transactions() + + lastTransactionId := b.checkpointer.TransactionID() + if lastTransactionId == "" { + // No previously processed transactions within this block so all are new + return transactions + } + + // Ignore transactions up to the last processed transaction ID + lastProcessedIndex := b.findLastProcessedIndex() + return transactions[lastProcessedIndex+1:] +} + +func (b *block) findLastProcessedIndex() int { + blockTransactionIds := []string{} + for _, transaction := range b.block.Transactions() { + blockTransactionIds = append(blockTransactionIds, transaction.ChannelHeader().GetTxId()) + } + + lastTransactionId := b.checkpointer.TransactionID() + lastProcessedIndex := -1 + for index, id := range blockTransactionIds { + if id == lastTransactionId { + lastProcessedIndex = index + } + } + if lastProcessedIndex < 0 { + panic( + fmt.Errorf( + "checkpoint transaction ID %s not found in block %d containing transactions: %s", + lastTransactionId, + b.block.Number(), + b.joinByComma(blockTransactionIds), + ), + ) + } + return lastProcessedIndex +} + +func (b *block) joinByComma(list []string) string { + result := "" + for index, item := range list { + if len(list)-1 == index { + result += item + } else { + result += item + ", " + } + } + return result +} diff --git a/off_chain_data/application-go/processor/transaction.go b/off_chain_data/application-go/processor/transaction.go new file mode 100644 index 00000000..99d965d3 --- /dev/null +++ b/off_chain_data/application-go/processor/transaction.go @@ -0,0 +1,73 @@ +package processor + +import ( + "fmt" + "offChainData/parser" + "offChainData/store" + "slices" +) + +type transactionProcessor struct { + blockNumber uint64 + transaction *parser.Transaction + writeToStore store.Writer + channelName string +} + +func (t *transactionProcessor) process() { + transactionId := t.transaction.ChannelHeader().GetTxId() + + writes := t.writes() + if len(writes) == 0 { + fmt.Println("Skipping read-only or system transaction", transactionId) + return + } + + fmt.Println("Process transaction", transactionId) + + t.writeToStore(store.LedgerUpdate{ + BlockNumber: t.blockNumber, + TransactionId: transactionId, + Writes: writes, + }) +} + +func (t *transactionProcessor) writes() []store.Write { + t.channelName = t.transaction.ChannelHeader().GetChannelId() + + nonSystemCCReadWriteSets := []parser.NamespaceReadWriteSet{} + for _, nsReadWriteSet := range t.transaction.NamespaceReadWriteSets() { + if !t.isSystemChaincode(nsReadWriteSet.Namespace()) { + nonSystemCCReadWriteSets = append(nonSystemCCReadWriteSets, nsReadWriteSet) + } + } + + writes := []store.Write{} + for _, readWriteSet := range nonSystemCCReadWriteSets { + namespace := readWriteSet.Namespace() + + for _, kvWrite := range readWriteSet.ReadWriteSet().GetWrites() { + writes = append(writes, store.Write{ + ChannelName: t.channelName, + Namespace: namespace, + Key: kvWrite.GetKey(), + IsDelete: kvWrite.GetIsDelete(), + Value: string(kvWrite.GetValue()), // Convert bytes to text, purely for readability in output + }) + } + } + + return writes +} + +func (t *transactionProcessor) isSystemChaincode(chaincodeName string) bool { + systemChaincodeNames := []string{ + "_lifecycle", + "cscc", + "escc", + "lscc", + "qscc", + "vscc", + } + return slices.Contains(systemChaincodeNames, chaincodeName) +} diff --git a/off_chain_data/application-go/store/flatFille.go b/off_chain_data/application-go/store/flatFille.go new file mode 100644 index 00000000..b05379ac --- /dev/null +++ b/off_chain_data/application-go/store/flatFille.go @@ -0,0 +1,76 @@ +package store + +import ( + "encoding/json" + "errors" + "fmt" + "math" + "offChainData/utils" + "os" + "strconv" + "strings" +) + +var storeFile = utils.EnvOrDefault("STORE_FILE", "store.log") +var SimulatedFailureCount = getSimulatedFailureCount() +var transactionCount uint = 0 // Used only to simulate failures + +// Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance. +// This implementation just writes to a file. +func ApplyWritesToOffChainStore(data LedgerUpdate) { + if err := simulateFailureIfRequired(); err != nil { + fmt.Println("[expected error]: " + err.Error()) + return + } + + writes := []string{} + for _, write := range data.Writes { + // TODO write also the TxID and block number so that you can compare easier to the output + marshaled, err := json.Marshal(write) + if err != nil { + panic(err) + } + + writes = append(writes, string(marshaled)) + } + + f, err := os.OpenFile(storeFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + panic(err) + } + + if _, err := f.Write([]byte(strings.Join(writes, "\n") + "\n")); err != nil { + f.Close() + panic(err) + } + + if err := f.Close(); err != nil { + panic(err) + } +} + +func simulateFailureIfRequired() error { + if SimulatedFailureCount > 0 && transactionCount >= SimulatedFailureCount { + transactionCount = 0 + return errors.New("simulated write failure") + } + + transactionCount += 1 + + return nil +} + +func getSimulatedFailureCount() uint { + valueAsString := utils.EnvOrDefault("SIMULATED_FAILURE_COUNT", "0") + valueAsFloat, err := strconv.ParseFloat(valueAsString, 64) + if err != nil { + panic(err) + } + + result := math.Floor(valueAsFloat) + if valueAsFloat < 0 { + panic(fmt.Errorf("invalid SIMULATED_FAILURE_COUNT value: %s", valueAsString)) + } + + return uint(result) +} diff --git a/off_chain_data/application-go/store/model.go b/off_chain_data/application-go/store/model.go new file mode 100644 index 00000000..1a6e8f33 --- /dev/null +++ b/off_chain_data/application-go/store/model.go @@ -0,0 +1,25 @@ +package store + +// Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance. +type Writer = func(data LedgerUpdate) + +// Ledger update made by a specific transaction. +type LedgerUpdate struct { + BlockNumber uint64 + TransactionId string + Writes []Write +} + +// Description of a ledger Write that can be applied to an off-chain data store. +type Write struct { + // Channel whose ledger is being updated. + ChannelName string `json:"channelName"` + // Namespace within the ledger. + Namespace string `json:"namespace"` + // Key name within the ledger namespace. + Key string `json:"key"` + // Whether the key and associated value are being deleted. + IsDelete bool `json:"isDelete"` + // If `isDelete` is false, the Value written to the key; otherwise ignored. + Value string `json:"value"` +} diff --git a/off_chain_data/application-go/transact.go b/off_chain_data/application-go/transact.go index 0735e5ae..0b388f16 100644 --- a/off_chain_data/application-go/transact.go +++ b/off_chain_data/application-go/transact.go @@ -44,6 +44,7 @@ func (t *transactApp) transact() { anAsset := atb.NewAsset() t.smartContract.CreateAsset(anAsset) + // TODO print txID to compare easier with block processing fmt.Println("Created asset", anAsset.ID) // Transfer randomly 1 in 2 assets to a new owner. diff --git a/off_chain_data/application-go/utils/utils.go b/off_chain_data/application-go/utils/utils.go index 940f6b2c..2700c860 100644 --- a/off_chain_data/application-go/utils/utils.go +++ b/off_chain_data/application-go/utils/utils.go @@ -4,6 +4,7 @@ import ( "crypto/rand" "errors" "math/big" + "os" ) // Pick a random element from an array. @@ -55,3 +56,11 @@ func Cache[T any](f func() T) func() T { return value.(T) } } + +func EnvOrDefault(key, defaultValue string) string { + result := os.Getenv(key) + if result == "" { + return defaultValue + } + return result +}