diff --git a/off_chain_data/application-go/listen.go b/off_chain_data/application-go/listen.go index daa9515d..c25f64a7 100644 --- a/off_chain_data/application-go/listen.go +++ b/off_chain_data/application-go/listen.go @@ -2,18 +2,28 @@ package main import ( "context" + "encoding/json" + "errors" "fmt" "math" "offChainData/parser" + "os" "slices" "strconv" + "strings" "github.com/hyperledger/fabric-gateway/pkg/client" "google.golang.org/grpc" ) var checkpointFile = envOrDefault("CHECKPOINT_FILE", "checkpoint.json") +var storeFile = envOrDefault("STORE_FILE", "store.log") var simulatedFailureCount = getSimulatedFailureCount() + +const startBlock = 0 + +var transactionCount uint = 0 // Used only to simulate failures + // Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance. type store = func(data ledgerUpdate) @@ -27,20 +37,59 @@ type ledgerUpdate struct { // Description of a ledger write that can be applied to an off-chain data store. type write struct { // Channel whose ledger is being updated. - channelName string + ChannelName string `json:"channelName"` // Namespace within the ledger. - namespace string + Namespace string `json:"namespace"` // Key name within the ledger namespace. - key string + Key string `json:"key"` // Whether the key and associated value are being deleted. - isDelete bool - // If `isDelete` is false, the value written to the key; otherwise ignored. - value []byte + IsDelete bool `json:"isDelete"` + // If `isDelete` is false, the Value written to the key; otherwise ignored. + Value string `json:"value"` } // Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance. // This implementation just writes to a file. var applyWritesToOffChainStore = func(data ledgerUpdate) { + if err := simulateFailureIfRequired(); err != nil { + fmt.Println("[expected error]: " + err.Error()) + return + } + + writes := []string{} + for _, write := range data.writes { + marshaled, err := json.Marshal(write) + if err != nil { + panic(err) + } + + writes = append(writes, string(marshaled)) + } + + f, err := os.OpenFile(storeFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + panic(err) + } + + if _, err := f.Write([]byte(strings.Join(writes, "\n") + "\n")); err != nil { + f.Close() + panic(err) + } + + if err := f.Close(); err != nil { + panic(err) + } +} + +func simulateFailureIfRequired() error { + if simulatedFailureCount > 0 && transactionCount >= simulatedFailureCount { + transactionCount = 0 + return errors.New("simulated write failure") + } + + transactionCount += 1 + + return nil } func listen(clientConnection *grpc.ClientConn) { @@ -62,7 +111,7 @@ func listen(clientConnection *grpc.ClientConn) { fmt.Printf("Start event listening from block %d\n", checkpointer.BlockNumber()) fmt.Printf("Last processed transaction ID within block: %s\n", checkpointer.TransactionID()) if simulatedFailureCount > 0 { - fmt.Printf("Simulating a write failure every %d transactions", simulatedFailureCount) + fmt.Printf("Simulating a write failure every %d transactions\n", simulatedFailureCount) } // TODO put into infinite loop like in public docs example @@ -71,8 +120,8 @@ func listen(clientConnection *grpc.ClientConn) { blocks, err := network.BlockEvents( ctx, - client.WithStartBlock(0), client.WithCheckpoint(checkpointer), + client.WithStartBlock(startBlock), // Used only if there is no checkpoint block number ) if err != nil { panic(err) @@ -200,8 +249,6 @@ type transactionProcessor struct { func (t *transactionProcessor) process() { transactionId := t.transaction.ChannelHeader().GetTxId() - fmt.Println("Process transaction", transactionId) - writes := t.writes() if len(writes) == 0 { fmt.Println("Skipping read-only or system transaction", transactionId) @@ -232,14 +279,13 @@ func (t *transactionProcessor) writes() []write { namespace := readWriteSet.Namespace() for _, kvWrite := range readWriteSet.ReadWriteSet().GetWrites() { - aWrite := write{ + writes = append(writes, write{ channelName, namespace, kvWrite.GetKey(), kvWrite.GetIsDelete(), - kvWrite.GetValue(), - } - writes = append(writes, aWrite) + string(kvWrite.GetValue()), // Convert bytes to text, purely for readability in output + }) } } diff --git a/off_chain_data/application-go/transact.go b/off_chain_data/application-go/transact.go index 93ca3398..a13ae059 100644 --- a/off_chain_data/application-go/transact.go +++ b/off_chain_data/application-go/transact.go @@ -36,7 +36,7 @@ func newTransactApp(smartContract *atb.AssetTransferBasic) *transactApp { func (t *transactApp) run() { for i := 0; i < int(t.batchSize); i++ { - go t.transact() + t.transact() } }