From fde0cd58cc9c1238c6d439d42fb9bb635562d240 Mon Sep 17 00:00:00 2001 From: Stanislav Jakuschevskij Date: Thu, 20 Feb 2025 19:12:29 +0100 Subject: [PATCH] Refactor store Add store interface and implement with offChainStore struct. Decompose storing into small chunks and keep state around storing writes and failure count. Move environment variables used for store setup into the setup phase of the listen function. Signed-off-by: Stanislav Jakuschevskij --- off_chain_data/application-go/listen.go | 57 ++++++++++-- off_chain_data/application-go/store.go | 112 ++++++++++++------------ 2 files changed, 103 insertions(+), 66 deletions(-) diff --git a/off_chain_data/application-go/listen.go b/off_chain_data/application-go/listen.go index 93491d5a..58d799d4 100644 --- a/off_chain_data/application-go/listen.go +++ b/off_chain_data/application-go/listen.go @@ -7,6 +7,7 @@ import ( "os" "os/signal" "slices" + "strconv" "strings" "syscall" @@ -35,12 +36,15 @@ func listen(clientConnection grpc.ClientConnInterface) error { checkpointer.Close() fmt.Println("Checkpointer closed.") }() - fmt.Println("Start event listening from block", checkpointer.BlockNumber()) fmt.Println("Last processed transaction ID within block:", checkpointer.TransactionID()) + + simulatedFailureCount := initSimulatedFailureCount() if simulatedFailureCount > 0 { fmt.Printf("Simulating a write failure every %d transactions\n", simulatedFailureCount) } + storeFile := envOrDefault("STORE_FILE", "store.log") + offChainStore := newOffChainStore(storeFile, simulatedFailureCount) ctx, close := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer func() { @@ -65,7 +69,7 @@ func listen(clientConnection grpc.ClientConnInterface) error { aBlockProcessor := blockProcessor{ parser.ParseBlock(blockProto), checkpointer, - applyWritesToOffChainStore, + offChainStore, } if err := aBlockProcessor.process(); err != nil { @@ -77,10 +81,46 @@ func listen(clientConnection grpc.ClientConnInterface) error { return nil } +func initSimulatedFailureCount() uint { + valueAsString := envOrDefault("SIMULATED_FAILURE_COUNT", "0") + result, err := strconv.ParseUint(valueAsString, 10, 0) + if err != nil { + panic(fmt.Errorf("invalid SIMULATED_FAILURE_COUNT value: %s", valueAsString)) + } + + return uint(result) +} + +// Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance. +type store interface { + write(ledgerUpdate) error +} + +// Ledger update made by a specific transaction. +type ledgerUpdate struct { + BlockNumber uint64 + TransactionID string + Writes []write +} + +// Description of a ledger Write that can be applied to an off-chain data store. +type write struct { + // Channel whose ledger is being updated. + ChannelName string `json:"channelName"` + // Namespace within the ledger. + Namespace string `json:"namespace"` + // Key name within the ledger namespace. + Key string `json:"key"` + // Whether the key and associated value are being deleted. + IsDelete bool `json:"isDelete"` + // If `isDelete` is false, the Value written to the key; otherwise ignored. + Value string `json:"value"` +} + type blockProcessor struct { parsedBlock *parser.Block checkpointer *client.FileCheckpointer - writeToStore writer + store store } func (b *blockProcessor) process() error { @@ -95,7 +135,7 @@ func (b *blockProcessor) process() error { txProcessor := transactionProcessor{ b.parsedBlock.Number(), validTransaction, - b.writeToStore, + b.store, } if err := txProcessor.process(); err != nil { return err @@ -182,9 +222,9 @@ func (b *blockProcessor) findLastProcessedIndex() (int, error) { } type transactionProcessor struct { - blockNumber uint64 - transaction *parser.Transaction - writeToStore writer + blockNumber uint64 + transaction *parser.Transaction + store store } func (t *transactionProcessor) process() error { @@ -201,8 +241,7 @@ func (t *transactionProcessor) process() error { } fmt.Println("Process transaction", transactionID) - - if err := t.writeToStore(ledgerUpdate{ + if err := t.store.write(ledgerUpdate{ BlockNumber: t.blockNumber, TransactionID: transactionID, Writes: writes, diff --git a/off_chain_data/application-go/store.go b/off_chain_data/application-go/store.go index 704e7632..6af565a4 100644 --- a/off_chain_data/application-go/store.go +++ b/off_chain_data/application-go/store.go @@ -5,63 +5,84 @@ import ( "errors" "fmt" "os" - "strconv" - "strings" ) -var storeFile = envOrDefault("STORE_FILE", "store.log") -var simulatedFailureCount = getSimulatedFailureCount() -var transactionCount uint = 0 // Used only to simulate failures +var errExpected = errors.New("expected error: simulated write failure") -// Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance. -type writer = func(ledgerUpdate) error - -// Ledger update made by a specific transaction. -type ledgerUpdate struct { - BlockNumber uint64 - TransactionID string - Writes []write +type offChainStore struct { + writes, path string + simulatedFailureCount, transactionCount uint } -// Description of a ledger Write that can be applied to an off-chain data store. -type write struct { - // Channel whose ledger is being updated. - ChannelName string `json:"channelName"` - // Namespace within the ledger. - Namespace string `json:"namespace"` - // Key name within the ledger namespace. - Key string `json:"key"` - // Whether the key and associated value are being deleted. - IsDelete bool `json:"isDelete"` - // If `isDelete` is false, the Value written to the key; otherwise ignored. - Value string `json:"value"` +func newOffChainStore(path string, simulatedFailureCount uint) *offChainStore { + return &offChainStore{ + "", + path, + uint(simulatedFailureCount), + 0, + } } // Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance. // This implementation just writes to a file. -func applyWritesToOffChainStore(data ledgerUpdate) error { - if err := simulateFailureIfRequired(); err != nil { +func (ocs *offChainStore) write(data ledgerUpdate) error { + if err := ocs.simulateFailureIfRequired(); err != nil { return err } - writes := []string{} - for _, write := range data.Writes { + ocs.clearLastWrites() + + if err := ocs.marshal(data.Writes); err != nil { + return err + } + + if err := ocs.persist(); err != nil { + return err + } + + return nil +} + +func (ocs *offChainStore) simulateFailureIfRequired() error { + if ocs.simulatedFailureCount > 0 && ocs.transactionCount >= ocs.simulatedFailureCount { + ocs.transactionCount = 0 + return errExpected + } + + ocs.transactionCount += 1 + + return nil +} + +func (ocs *offChainStore) clearLastWrites() { + ocs.writes = "" +} + +func (ocs *offChainStore) marshal(writes []write) error { + for _, write := range writes { marshaled, err := json.Marshal(write) if err != nil { return err } - writes = append(writes, string(marshaled)) + ocs.writes += string(marshaled) + "\n" } - f, err := os.OpenFile(storeFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + return nil +} + +func (ocs *offChainStore) persist() error { + f, err := os.OpenFile(ocs.path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { return err } - if _, err := f.Write([]byte(strings.Join(writes, "\n") + "\n")); err != nil { - f.Close() - return err + if _, writeErr := f.Write([]byte(ocs.writes)); writeErr != nil { + if closeErr := f.Close(); closeErr != nil { + return fmt.Errorf("write error: %v, close error: %v", writeErr, closeErr) + } + + return writeErr } if err := f.Close(); err != nil { @@ -70,26 +91,3 @@ func applyWritesToOffChainStore(data ledgerUpdate) error { return nil } - -var errExpected = errors.New("expected error: simulated write failure") - -func simulateFailureIfRequired() error { - if simulatedFailureCount > 0 && transactionCount >= simulatedFailureCount { - transactionCount = 0 - return errExpected - } - - transactionCount += 1 - - return nil -} - -func getSimulatedFailureCount() uint { - valueAsString := envOrDefault("SIMULATED_FAILURE_COUNT", "0") - result, err := strconv.ParseUint(valueAsString, 10, 0) - if err != nil { - panic(fmt.Errorf("invalid SIMULATED_FAILURE_COUNT value: %s", valueAsString)) - } - - return uint(result) -}