diff --git a/off_chain_data/application-go/listen.go b/off_chain_data/application-go/listen.go index 93491d5a..58d799d4 100644 --- a/off_chain_data/application-go/listen.go +++ b/off_chain_data/application-go/listen.go @@ -7,6 +7,7 @@ import ( "os" "os/signal" "slices" + "strconv" "strings" "syscall" @@ -35,12 +36,15 @@ func listen(clientConnection grpc.ClientConnInterface) error { checkpointer.Close() fmt.Println("Checkpointer closed.") }() - fmt.Println("Start event listening from block", checkpointer.BlockNumber()) fmt.Println("Last processed transaction ID within block:", checkpointer.TransactionID()) + + simulatedFailureCount := initSimulatedFailureCount() if simulatedFailureCount > 0 { fmt.Printf("Simulating a write failure every %d transactions\n", simulatedFailureCount) } + storeFile := envOrDefault("STORE_FILE", "store.log") + offChainStore := newOffChainStore(storeFile, simulatedFailureCount) ctx, close := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer func() { @@ -65,7 +69,7 @@ func listen(clientConnection grpc.ClientConnInterface) error { aBlockProcessor := blockProcessor{ parser.ParseBlock(blockProto), checkpointer, - applyWritesToOffChainStore, + offChainStore, } if err := aBlockProcessor.process(); err != nil { @@ -77,10 +81,46 @@ func listen(clientConnection grpc.ClientConnInterface) error { return nil } +func initSimulatedFailureCount() uint { + valueAsString := envOrDefault("SIMULATED_FAILURE_COUNT", "0") + result, err := strconv.ParseUint(valueAsString, 10, 0) + if err != nil { + panic(fmt.Errorf("invalid SIMULATED_FAILURE_COUNT value: %s", valueAsString)) + } + + return uint(result) +} + +// Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance. +type store interface { + write(ledgerUpdate) error +} + +// Ledger update made by a specific transaction. +type ledgerUpdate struct { + BlockNumber uint64 + TransactionID string + Writes []write +} + +// Description of a ledger Write that can be applied to an off-chain data store. +type write struct { + // Channel whose ledger is being updated. + ChannelName string `json:"channelName"` + // Namespace within the ledger. + Namespace string `json:"namespace"` + // Key name within the ledger namespace. + Key string `json:"key"` + // Whether the key and associated value are being deleted. + IsDelete bool `json:"isDelete"` + // If `isDelete` is false, the Value written to the key; otherwise ignored. + Value string `json:"value"` +} + type blockProcessor struct { parsedBlock *parser.Block checkpointer *client.FileCheckpointer - writeToStore writer + store store } func (b *blockProcessor) process() error { @@ -95,7 +135,7 @@ func (b *blockProcessor) process() error { txProcessor := transactionProcessor{ b.parsedBlock.Number(), validTransaction, - b.writeToStore, + b.store, } if err := txProcessor.process(); err != nil { return err @@ -182,9 +222,9 @@ func (b *blockProcessor) findLastProcessedIndex() (int, error) { } type transactionProcessor struct { - blockNumber uint64 - transaction *parser.Transaction - writeToStore writer + blockNumber uint64 + transaction *parser.Transaction + store store } func (t *transactionProcessor) process() error { @@ -201,8 +241,7 @@ func (t *transactionProcessor) process() error { } fmt.Println("Process transaction", transactionID) - - if err := t.writeToStore(ledgerUpdate{ + if err := t.store.write(ledgerUpdate{ BlockNumber: t.blockNumber, TransactionID: transactionID, Writes: writes, diff --git a/off_chain_data/application-go/store.go b/off_chain_data/application-go/store.go index 704e7632..6af565a4 100644 --- a/off_chain_data/application-go/store.go +++ b/off_chain_data/application-go/store.go @@ -5,63 +5,84 @@ import ( "errors" "fmt" "os" - "strconv" - "strings" ) -var storeFile = envOrDefault("STORE_FILE", "store.log") -var simulatedFailureCount = getSimulatedFailureCount() -var transactionCount uint = 0 // Used only to simulate failures +var errExpected = errors.New("expected error: simulated write failure") -// Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance. -type writer = func(ledgerUpdate) error - -// Ledger update made by a specific transaction. -type ledgerUpdate struct { - BlockNumber uint64 - TransactionID string - Writes []write +type offChainStore struct { + writes, path string + simulatedFailureCount, transactionCount uint } -// Description of a ledger Write that can be applied to an off-chain data store. -type write struct { - // Channel whose ledger is being updated. - ChannelName string `json:"channelName"` - // Namespace within the ledger. - Namespace string `json:"namespace"` - // Key name within the ledger namespace. - Key string `json:"key"` - // Whether the key and associated value are being deleted. - IsDelete bool `json:"isDelete"` - // If `isDelete` is false, the Value written to the key; otherwise ignored. - Value string `json:"value"` +func newOffChainStore(path string, simulatedFailureCount uint) *offChainStore { + return &offChainStore{ + "", + path, + uint(simulatedFailureCount), + 0, + } } // Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance. // This implementation just writes to a file. -func applyWritesToOffChainStore(data ledgerUpdate) error { - if err := simulateFailureIfRequired(); err != nil { +func (ocs *offChainStore) write(data ledgerUpdate) error { + if err := ocs.simulateFailureIfRequired(); err != nil { return err } - writes := []string{} - for _, write := range data.Writes { + ocs.clearLastWrites() + + if err := ocs.marshal(data.Writes); err != nil { + return err + } + + if err := ocs.persist(); err != nil { + return err + } + + return nil +} + +func (ocs *offChainStore) simulateFailureIfRequired() error { + if ocs.simulatedFailureCount > 0 && ocs.transactionCount >= ocs.simulatedFailureCount { + ocs.transactionCount = 0 + return errExpected + } + + ocs.transactionCount += 1 + + return nil +} + +func (ocs *offChainStore) clearLastWrites() { + ocs.writes = "" +} + +func (ocs *offChainStore) marshal(writes []write) error { + for _, write := range writes { marshaled, err := json.Marshal(write) if err != nil { return err } - writes = append(writes, string(marshaled)) + ocs.writes += string(marshaled) + "\n" } - f, err := os.OpenFile(storeFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + return nil +} + +func (ocs *offChainStore) persist() error { + f, err := os.OpenFile(ocs.path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { return err } - if _, err := f.Write([]byte(strings.Join(writes, "\n") + "\n")); err != nil { - f.Close() - return err + if _, writeErr := f.Write([]byte(ocs.writes)); writeErr != nil { + if closeErr := f.Close(); closeErr != nil { + return fmt.Errorf("write error: %v, close error: %v", writeErr, closeErr) + } + + return writeErr } if err := f.Close(); err != nil { @@ -70,26 +91,3 @@ func applyWritesToOffChainStore(data ledgerUpdate) error { return nil } - -var errExpected = errors.New("expected error: simulated write failure") - -func simulateFailureIfRequired() error { - if simulatedFailureCount > 0 && transactionCount >= simulatedFailureCount { - transactionCount = 0 - return errExpected - } - - transactionCount += 1 - - return nil -} - -func getSimulatedFailureCount() uint { - valueAsString := envOrDefault("SIMULATED_FAILURE_COUNT", "0") - result, err := strconv.ParseUint(valueAsString, 10, 0) - if err != nil { - panic(fmt.Errorf("invalid SIMULATED_FAILURE_COUNT value: %s", valueAsString)) - } - - return uint(result) -}