diff --git a/off_chain_data/application-go/connect.go b/off_chain_data/application-go/connect.go index 91a69676..728e9888 100644 --- a/off_chain_data/application-go/connect.go +++ b/off_chain_data/application-go/connect.go @@ -9,6 +9,7 @@ package main import ( "crypto/x509" "fmt" + "offChainData/utils" "os" "path" "time" @@ -23,37 +24,29 @@ import ( const peerName = "peer0.org1.example.com" var ( - channelName = envOrDefault("CHANNEL_NAME", "mychannel") - chaincodeName = envOrDefault("CHAINCODE_NAME", "basic") - mspID = envOrDefault("MSP_ID", "Org1MSP") + channelName = utils.EnvOrDefault("CHANNEL_NAME", "mychannel") + chaincodeName = utils.EnvOrDefault("CHAINCODE_NAME", "basic") + mspID = utils.EnvOrDefault("MSP_ID", "Org1MSP") // Path to crypto materials. - cryptoPath = envOrDefault("CRYPTO_PATH", "../../test-network/organizations/peerOrganizations/org1.example.com") + cryptoPath = utils.EnvOrDefault("CRYPTO_PATH", "../../test-network/organizations/peerOrganizations/org1.example.com") // Path to user private key directory. - keyDirectoryPath = envOrDefault("KEY_DIRECTORY_PATH", cryptoPath+"/users/User1@org1.example.com/msp/keystore") + keyDirectoryPath = utils.EnvOrDefault("KEY_DIRECTORY_PATH", cryptoPath+"/users/User1@org1.example.com/msp/keystore") // Path to user certificate. - certPath = envOrDefault("CERT_PATH", cryptoPath+"/users/User1@org1.example.com/msp/signcerts/cert.pem") + certPath = utils.EnvOrDefault("CERT_PATH", cryptoPath+"/users/User1@org1.example.com/msp/signcerts/cert.pem") // Path to peer tls certificate. - tlsCertPath = envOrDefault("TLS_CERT_PATH", cryptoPath+"/peers/peer0.org1.example.com/tls/ca.crt") + tlsCertPath = utils.EnvOrDefault("TLS_CERT_PATH", cryptoPath+"/peers/peer0.org1.example.com/tls/ca.crt") // Gateway peer endpoint. - peerEndpoint = envOrDefault("PEER_ENDPOINT", "dns:///localhost:7051") + peerEndpoint = utils.EnvOrDefault("PEER_ENDPOINT", "dns:///localhost:7051") // Gateway peer SSL host name override. - peerHostAlias = envOrDefault("PEER_HOST_ALIAS", peerName) + peerHostAlias = utils.EnvOrDefault("PEER_HOST_ALIAS", peerName) ) -func envOrDefault(key, defaultValue string) string { - result := os.Getenv(key) - if result == "" { - return defaultValue - } - return result -} - func newGrpcConnection() *grpc.ClientConn { certificatePEM, err := os.ReadFile(tlsCertPath) if err != nil { diff --git a/off_chain_data/application-go/contract/asset.go b/off_chain_data/application-go/contract/model.go similarity index 100% rename from off_chain_data/application-go/contract/asset.go rename to off_chain_data/application-go/contract/model.go diff --git a/off_chain_data/application-go/listen.go b/off_chain_data/application-go/listen.go index 80e6a268..4a974e78 100644 --- a/off_chain_data/application-go/listen.go +++ b/off_chain_data/application-go/listen.go @@ -2,96 +2,16 @@ package main import ( "context" - "encoding/json" - "errors" "fmt" - "math" "offChainData/parser" - "os" - "slices" - "strconv" - "strings" + "offChainData/processor" + "offChainData/store" + "offChainData/utils" "github.com/hyperledger/fabric-gateway/pkg/client" "google.golang.org/grpc" ) -var checkpointFile = envOrDefault("CHECKPOINT_FILE", "checkpoint.json") -var storeFile = envOrDefault("STORE_FILE", "store.log") -var simulatedFailureCount = getSimulatedFailureCount() - -const startBlock = 0 - -var transactionCount uint = 0 // Used only to simulate failures - -// Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance. -type store = func(data ledgerUpdate) - -// Ledger update made by a specific transaction. -type ledgerUpdate struct { - blockNumber uint64 - transactionId string - writes []write -} - -// Description of a ledger write that can be applied to an off-chain data store. -type write struct { - // Channel whose ledger is being updated. - ChannelName string `json:"channelName"` - // Namespace within the ledger. - Namespace string `json:"namespace"` - // Key name within the ledger namespace. - Key string `json:"key"` - // Whether the key and associated value are being deleted. - IsDelete bool `json:"isDelete"` - // If `isDelete` is false, the Value written to the key; otherwise ignored. - Value string `json:"value"` -} - -// Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance. -// This implementation just writes to a file. -var applyWritesToOffChainStore = func(data ledgerUpdate) { - if err := simulateFailureIfRequired(); err != nil { - fmt.Println("[expected error]: " + err.Error()) - return - } - - writes := []string{} - for _, write := range data.writes { - marshaled, err := json.Marshal(write) - if err != nil { - panic(err) - } - - writes = append(writes, string(marshaled)) - } - - f, err := os.OpenFile(storeFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - panic(err) - } - - if _, err := f.Write([]byte(strings.Join(writes, "\n") + "\n")); err != nil { - f.Close() - panic(err) - } - - if err := f.Close(); err != nil { - panic(err) - } -} - -func simulateFailureIfRequired() error { - if simulatedFailureCount > 0 && transactionCount >= simulatedFailureCount { - transactionCount = 0 - return errors.New("simulated write failure") - } - - transactionCount += 1 - - return nil -} - func listen(clientConnection *grpc.ClientConn) { id, options := newConnectOptions(clientConnection) gateway, err := client.Connect(id, options...) @@ -100,8 +20,7 @@ func listen(clientConnection *grpc.ClientConn) { } defer gateway.Close() - network := gateway.GetNetwork(channelName) - + checkpointFile := utils.EnvOrDefault("CHECKPOINT_FILE", "checkpoint.json") checkpointer, err := client.NewFileCheckpointer(checkpointFile) if err != nil { panic(err) @@ -110,196 +29,31 @@ func listen(clientConnection *grpc.ClientConn) { fmt.Println("Start event listening from block", checkpointer.BlockNumber()) fmt.Println("Last processed transaction ID within block:", checkpointer.TransactionID()) - if simulatedFailureCount > 0 { - fmt.Printf("Simulating a write failure every %d transactions\n", simulatedFailureCount) + if store.SimulatedFailureCount > 0 { + fmt.Printf("Simulating a write failure every %d transactions\n", store.SimulatedFailureCount) } // TODO put into infinite loop like in public docs example ctx, cancel := context.WithCancel(context.Background()) defer cancel() + network := gateway.GetNetwork(channelName) blocks, err := network.BlockEvents( ctx, client.WithCheckpoint(checkpointer), - client.WithStartBlock(startBlock), // Used only if there is no checkpoint block number + client.WithStartBlock(0), // Used only if there is no checkpoint block number ) if err != nil { panic(err) } for blockProto := range blocks { - aBlockProcessor := blockProcessor{ + blockProcessor := processor.NewBlock( parser.ParseBlock(blockProto), checkpointer, - applyWritesToOffChainStore, - } - aBlockProcessor.process() - } -} - -type blockProcessor struct { - block *parser.Block - checkpointer *client.FileCheckpointer - storeWrites store -} - -func (b *blockProcessor) process() { - blockNumber := b.block.Number() - - fmt.Println("Received block", blockNumber) - - for _, transaction := range b.validTransactions() { - aTransactionProcessor := transactionProcessor{ - blockNumber, - transaction, - b.storeWrites, - } - aTransactionProcessor.process() - - transactionId := transaction.ChannelHeader().GetTxId() - b.checkpointer.CheckpointTransaction(blockNumber, transactionId) - } - - b.checkpointer.CheckpointBlock(b.block.Number()) -} - -func (b blockProcessor) validTransactions() []*parser.Transaction { - result := []*parser.Transaction{} - for _, transaction := range b.getNewTransactions() { - if transaction.IsValid() { - result = append(result, transaction) - } - } - return result -} - -func (b *blockProcessor) getNewTransactions() []*parser.Transaction { - transactions := b.block.Transactions() - - lastTransactionId := b.checkpointer.TransactionID() - if lastTransactionId == "" { - // No previously processed transactions within this block so all are new - return transactions - } - - // Ignore transactions up to the last processed transaction ID - lastProcessedIndex := b.findLastProcessedIndex() - return transactions[lastProcessedIndex+1:] -} - -func (b blockProcessor) findLastProcessedIndex() int { - blockTransactionIds := []string{} - for _, transaction := range b.block.Transactions() { - blockTransactionIds = append(blockTransactionIds, transaction.ChannelHeader().GetTxId()) - } - - lastTransactionId := b.checkpointer.TransactionID() - lastProcessedIndex := -1 - for index, id := range blockTransactionIds { - if id == lastTransactionId { - lastProcessedIndex = index - } - } - if lastProcessedIndex < 0 { - panic( - fmt.Errorf( - "checkpoint transaction ID %s not found in block %d containing transactions: %s", - lastTransactionId, - b.block.Number(), - joinByComma(blockTransactionIds), - ), + store.ApplyWritesToOffChainStore, + channelName, ) + blockProcessor.Process() } - return lastProcessedIndex -} - -func joinByComma(list []string) string { - result := "" - for index, item := range list { - if len(list)-1 == index { - result += item - } else { - result += item + ", " - } - } - return result -} - -func getSimulatedFailureCount() uint { - valueAsString := envOrDefault("SIMULATED_FAILURE_COUNT", "0") - valueAsFloat, err := strconv.ParseFloat(valueAsString, 64) - if err != nil { - panic(err) - } - - result := math.Floor(valueAsFloat) - if valueAsFloat < 0 { - panic(fmt.Errorf("invalid SIMULATED_FAILURE_COUNT value: %s", valueAsString)) - } - - return uint(result) -} - -type transactionProcessor struct { - blockNumber uint64 - transaction *parser.Transaction - storeWrites store -} - -func (t *transactionProcessor) process() { - transactionId := t.transaction.ChannelHeader().GetTxId() - - writes := t.writes() - if len(writes) == 0 { - fmt.Println("Skipping read-only or system transaction", transactionId) - return - } - - fmt.Println("Process transaction", transactionId) - - t.storeWrites(ledgerUpdate{ - t.blockNumber, - transactionId, - writes, - }) -} - -func (t *transactionProcessor) writes() []write { - channelName = t.transaction.ChannelHeader().GetChannelId() - - nonSystemCCReadWriteSets := []parser.NamespaceReadWriteSet{} - for _, nsReadWriteSet := range t.transaction.NamespaceReadWriteSets() { - if !t.isSystemChaincode(nsReadWriteSet.Namespace()) { - nonSystemCCReadWriteSets = append(nonSystemCCReadWriteSets, nsReadWriteSet) - } - } - - writes := []write{} - for _, readWriteSet := range nonSystemCCReadWriteSets { - namespace := readWriteSet.Namespace() - - for _, kvWrite := range readWriteSet.ReadWriteSet().GetWrites() { - writes = append(writes, write{ - channelName, - namespace, - kvWrite.GetKey(), - kvWrite.GetIsDelete(), - string(kvWrite.GetValue()), // Convert bytes to text, purely for readability in output - }) - } - } - - return writes -} - -func (t *transactionProcessor) isSystemChaincode(chaincodeName string) bool { - systemChaincodeNames := []string{ - "_lifecycle", - "cscc", - "escc", - "lscc", - "qscc", - "vscc", - } - return slices.Contains(systemChaincodeNames, chaincodeName) } diff --git a/off_chain_data/application-go/parser/blockParser.go b/off_chain_data/application-go/parser/block.go similarity index 100% rename from off_chain_data/application-go/parser/blockParser.go rename to off_chain_data/application-go/parser/block.go diff --git a/off_chain_data/application-go/parser/blockParser_test.go b/off_chain_data/application-go/parser/block_test.go similarity index 100% rename from off_chain_data/application-go/parser/blockParser_test.go rename to off_chain_data/application-go/parser/block_test.go diff --git a/off_chain_data/application-go/processor/block.go b/off_chain_data/application-go/processor/block.go new file mode 100644 index 00000000..13cc0759 --- /dev/null +++ b/off_chain_data/application-go/processor/block.go @@ -0,0 +1,114 @@ +package processor + +import ( + "fmt" + "offChainData/parser" + "offChainData/store" + + "github.com/hyperledger/fabric-gateway/pkg/client" +) + +type block struct { + block *parser.Block + checkpointer *client.FileCheckpointer + writeToStore store.Writer + channelName string +} + +func NewBlock( + _block *parser.Block, + checkpointer *client.FileCheckpointer, + writeToStore store.Writer, + channelName string, +) *block { + return &block{ + _block, + checkpointer, + writeToStore, + channelName, + } +} + +func (b *block) Process() { + blockNumber := b.block.Number() + + fmt.Println("\nReceived block", blockNumber) + + for _, transaction := range b.validTransactions() { + aTransactionProcessor := transactionProcessor{ + blockNumber, + transaction, + // TODO use pointer to parent and get blockNumber, store and channelName from parent + b.writeToStore, + b.channelName, + } + aTransactionProcessor.process() + + transactionId := transaction.ChannelHeader().GetTxId() + b.checkpointer.CheckpointTransaction(blockNumber, transactionId) + } + + b.checkpointer.CheckpointBlock(b.block.Number()) +} + +func (b *block) validTransactions() []*parser.Transaction { + result := []*parser.Transaction{} + for _, transaction := range b.getNewTransactions() { + if transaction.IsValid() { + result = append(result, transaction) + } + } + return result +} + +func (b *block) getNewTransactions() []*parser.Transaction { + transactions := b.block.Transactions() + + lastTransactionId := b.checkpointer.TransactionID() + if lastTransactionId == "" { + // No previously processed transactions within this block so all are new + return transactions + } + + // Ignore transactions up to the last processed transaction ID + lastProcessedIndex := b.findLastProcessedIndex() + return transactions[lastProcessedIndex+1:] +} + +func (b *block) findLastProcessedIndex() int { + blockTransactionIds := []string{} + for _, transaction := range b.block.Transactions() { + blockTransactionIds = append(blockTransactionIds, transaction.ChannelHeader().GetTxId()) + } + + lastTransactionId := b.checkpointer.TransactionID() + lastProcessedIndex := -1 + for index, id := range blockTransactionIds { + if id == lastTransactionId { + lastProcessedIndex = index + } + } + if lastProcessedIndex < 0 { + panic( + fmt.Errorf( + "checkpoint transaction ID %s not found in block %d containing transactions: %s", + lastTransactionId, + b.block.Number(), + b.joinByComma(blockTransactionIds), + ), + ) + } + return lastProcessedIndex +} + +func (b *block) joinByComma(list []string) string { + result := "" + for index, item := range list { + if len(list)-1 == index { + result += item + } else { + result += item + ", " + } + } + return result +} diff --git a/off_chain_data/application-go/processor/transaction.go b/off_chain_data/application-go/processor/transaction.go new file mode 100644 index 00000000..99d965d3 --- /dev/null +++ b/off_chain_data/application-go/processor/transaction.go @@ -0,0 +1,73 @@ +package processor + +import ( + "fmt" + "offChainData/parser" + "offChainData/store" + "slices" +) + +type transactionProcessor struct { + blockNumber uint64 + transaction *parser.Transaction + writeToStore store.Writer + channelName string +} + +func (t *transactionProcessor) process() { + transactionId := t.transaction.ChannelHeader().GetTxId() + + writes := t.writes() + if len(writes) == 0 { + fmt.Println("Skipping read-only or system transaction", transactionId) + return + } + + fmt.Println("Process transaction", transactionId) + + t.writeToStore(store.LedgerUpdate{ + BlockNumber: t.blockNumber, + TransactionId: transactionId, + Writes: writes, + }) +} + +func (t *transactionProcessor) writes() []store.Write { + t.channelName = t.transaction.ChannelHeader().GetChannelId() + + nonSystemCCReadWriteSets := []parser.NamespaceReadWriteSet{} + for _, nsReadWriteSet := range t.transaction.NamespaceReadWriteSets() { + if !t.isSystemChaincode(nsReadWriteSet.Namespace()) { + nonSystemCCReadWriteSets = append(nonSystemCCReadWriteSets, nsReadWriteSet) + } + } + + writes := []store.Write{} + for _, readWriteSet := range nonSystemCCReadWriteSets { + namespace := readWriteSet.Namespace() + + for _, kvWrite := range readWriteSet.ReadWriteSet().GetWrites() { + writes = append(writes, store.Write{ + ChannelName: t.channelName, + Namespace: namespace, + Key: kvWrite.GetKey(), + IsDelete: kvWrite.GetIsDelete(), + Value: string(kvWrite.GetValue()), // Convert bytes to text, purely for readability in output + }) + } + } + + return writes +} + +func (t *transactionProcessor) isSystemChaincode(chaincodeName string) bool { + systemChaincodeNames := []string{ + "_lifecycle", + "cscc", + "escc", + "lscc", + "qscc", + "vscc", + } + return slices.Contains(systemChaincodeNames, chaincodeName) +} diff --git a/off_chain_data/application-go/store/flatFille.go b/off_chain_data/application-go/store/flatFille.go new file mode 100644 index 00000000..b05379ac --- /dev/null +++ b/off_chain_data/application-go/store/flatFille.go @@ -0,0 +1,76 @@ +package store + +import ( + "encoding/json" + "errors" + "fmt" + "math" + "offChainData/utils" + "os" + "strconv" + "strings" +) + +var storeFile = utils.EnvOrDefault("STORE_FILE", "store.log") +var SimulatedFailureCount = getSimulatedFailureCount() +var transactionCount uint = 0 // Used only to simulate failures + +// Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance. +// This implementation just writes to a file. +func ApplyWritesToOffChainStore(data LedgerUpdate) { + if err := simulateFailureIfRequired(); err != nil { + fmt.Println("[expected error]: " + err.Error()) + return + } + + writes := []string{} + for _, write := range data.Writes { + // TODO write also the TxID and block number so that you can compare easier to the output + marshaled, err := json.Marshal(write) + if err != nil { + panic(err) + } + + writes = append(writes, string(marshaled)) + } + + f, err := os.OpenFile(storeFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + panic(err) + } + + if _, err := f.Write([]byte(strings.Join(writes, "\n") + "\n")); err != nil { + f.Close() + panic(err) + } + + if err := f.Close(); err != nil { + panic(err) + } +} + +func simulateFailureIfRequired() error { + if SimulatedFailureCount > 0 && transactionCount >= SimulatedFailureCount { + transactionCount = 0 + return errors.New("simulated write failure") + } + + transactionCount += 1 + + return nil +} + +func getSimulatedFailureCount() uint { + valueAsString := utils.EnvOrDefault("SIMULATED_FAILURE_COUNT", "0") + valueAsFloat, err := strconv.ParseFloat(valueAsString, 64) + if err != nil { + panic(err) + } + + result := math.Floor(valueAsFloat) + if valueAsFloat < 0 { + panic(fmt.Errorf("invalid SIMULATED_FAILURE_COUNT value: %s", valueAsString)) + } + + return uint(result) +} diff --git a/off_chain_data/application-go/store/model.go b/off_chain_data/application-go/store/model.go new file mode 100644 index 00000000..1a6e8f33 --- /dev/null +++ b/off_chain_data/application-go/store/model.go @@ -0,0 +1,25 @@ +package store + +// Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance. +type Writer = func(data LedgerUpdate) + +// Ledger update made by a specific transaction. +type LedgerUpdate struct { + BlockNumber uint64 + TransactionId string + Writes []Write +} + +// Description of a ledger Write that can be applied to an off-chain data store. +type Write struct { + // Channel whose ledger is being updated. + ChannelName string `json:"channelName"` + // Namespace within the ledger. + Namespace string `json:"namespace"` + // Key name within the ledger namespace. + Key string `json:"key"` + // Whether the key and associated value are being deleted. + IsDelete bool `json:"isDelete"` + // If `isDelete` is false, the Value written to the key; otherwise ignored. + Value string `json:"value"` +} diff --git a/off_chain_data/application-go/transact.go b/off_chain_data/application-go/transact.go index 0735e5ae..0b388f16 100644 --- a/off_chain_data/application-go/transact.go +++ b/off_chain_data/application-go/transact.go @@ -44,6 +44,7 @@ func (t *transactApp) transact() { anAsset := atb.NewAsset() t.smartContract.CreateAsset(anAsset) + // TODO print txID to compare easier with block processing fmt.Println("Created asset", anAsset.ID) // Transfer randomly 1 in 2 assets to a new owner. diff --git a/off_chain_data/application-go/utils/utils.go b/off_chain_data/application-go/utils/utils.go index 940f6b2c..2700c860 100644 --- a/off_chain_data/application-go/utils/utils.go +++ b/off_chain_data/application-go/utils/utils.go @@ -4,6 +4,7 @@ import ( "crypto/rand" "errors" "math/big" + "os" ) // Pick a random element from an array. @@ -55,3 +56,11 @@ func Cache[T any](f func() T) func() T { return value.(T) } } + +func EnvOrDefault(key, defaultValue string) string { + result := os.Getenv(key) + if result == "" { + return defaultValue + } + return result +}