From 1364645c6cb126d1dae1fb0d7ce7a3108bfb3ac4 Mon Sep 17 00:00:00 2001 From: Stanislav Jakuschevskij Date: Fri, 3 Jan 2025 17:47:15 +0100 Subject: [PATCH] Fix simulated failure issue Before all transactions were processed and when the failure was simulated a message was printed and all the transactions still processed. Now the store returns an error when the failure is simulated which the listener expects so that it can gracefully shutdown the system and close the context. The context must be closed correctly or the checkpointer won't save the last processed transactionId to the file system. Signed-off-by: Stanislav Jakuschevskij --- off_chain_data/application-go/listen.go | 17 ++++++++++++----- .../application-go/processor/block.go | 8 ++++++-- .../application-go/processor/transaction.go | 11 +++++++---- .../application-go/store/flatFille.go | 11 ++++++----- off_chain_data/application-go/store/model.go | 2 +- 5 files changed, 32 insertions(+), 17 deletions(-) diff --git a/off_chain_data/application-go/listen.go b/off_chain_data/application-go/listen.go index d6200dba..d2b99ded 100644 --- a/off_chain_data/application-go/listen.go +++ b/off_chain_data/application-go/listen.go @@ -43,17 +43,20 @@ func listen(clientConnection *grpc.ClientConn) { fmt.Printf("Simulating a write failure every %d transactions\n", store.SimulatedFailureCount) } - ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + ctx, close := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer func() { - stop() + close() fmt.Println("Context closed.") }() network := gateway.GetNetwork(channelName) blocks, err := network.BlockEvents( ctx, + // Used only if there is no checkpoint block number. + // Order matters. WithStartBlock must be set before + // WithCheckpoint to work. + client.WithStartBlock(0), client.WithCheckpoint(checkpointer), - client.WithStartBlock(0), // Used only if there is no checkpoint block number ) if err != nil { panic(err) @@ -76,11 +79,15 @@ func listen(clientConnection *grpc.ClientConn) { store.ApplyWritesToOffChainStore, channelName, ) - blockProcessor.Process() + + if err := blockProcessor.Process(); err == store.ErrExpected { + fmt.Println(err) + return + } } } }() wg.Wait() - fmt.Println("\nReceived 'SIGTERM' signal. Shutting down listener gracefully...") + fmt.Println("\nShutting down listener gracefully...") } diff --git a/off_chain_data/application-go/processor/block.go b/off_chain_data/application-go/processor/block.go index 3734b2cd..fcf44c9f 100644 --- a/off_chain_data/application-go/processor/block.go +++ b/off_chain_data/application-go/processor/block.go @@ -29,7 +29,7 @@ func NewBlock( } } -func (b *block) Process() { +func (b *block) Process() error { blockNumber := b.parsedBlock.Number() fmt.Println("\nReceived block", blockNumber) @@ -42,13 +42,17 @@ func (b *block) Process() { b.writeToStore, b.channelName, } - aTransaction.process() + if err := aTransaction.process(); err != nil { + return err + } transactionId := validTransaction.ChannelHeader().GetTxId() b.checkpointer.CheckpointTransaction(blockNumber, transactionId) } b.checkpointer.CheckpointBlock(b.parsedBlock.Number()) + + return nil } func (b *block) validTransactions() []*parser.Transaction { diff --git a/off_chain_data/application-go/processor/transaction.go b/off_chain_data/application-go/processor/transaction.go index 3ff78edd..5d705279 100644 --- a/off_chain_data/application-go/processor/transaction.go +++ b/off_chain_data/application-go/processor/transaction.go @@ -14,22 +14,25 @@ type transaction struct { channelName string } -func (t *transaction) process() { +func (t *transaction) process() error { transactionId := t.transaction.ChannelHeader().GetTxId() writes := t.writes() if len(writes) == 0 { fmt.Println("Skipping read-only or system transaction", transactionId) - return + return nil } fmt.Println("Process transaction", transactionId) - t.writeToStore(store.LedgerUpdate{ + if err := t.writeToStore(store.LedgerUpdate{ BlockNumber: t.blockNumber, TransactionId: transactionId, Writes: writes, - }) + }); err != nil { + return err + } + return nil } func (t *transaction) writes() []store.Write { diff --git a/off_chain_data/application-go/store/flatFille.go b/off_chain_data/application-go/store/flatFille.go index b05379ac..432099d3 100644 --- a/off_chain_data/application-go/store/flatFille.go +++ b/off_chain_data/application-go/store/flatFille.go @@ -14,18 +14,17 @@ import ( var storeFile = utils.EnvOrDefault("STORE_FILE", "store.log") var SimulatedFailureCount = getSimulatedFailureCount() var transactionCount uint = 0 // Used only to simulate failures +var ErrExpected = errors.New("[expected error]: simulated write failure") // Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance. // This implementation just writes to a file. -func ApplyWritesToOffChainStore(data LedgerUpdate) { +func ApplyWritesToOffChainStore(data LedgerUpdate) error { if err := simulateFailureIfRequired(); err != nil { - fmt.Println("[expected error]: " + err.Error()) - return + return err } writes := []string{} for _, write := range data.Writes { - // TODO write also the TxID and block number so that you can compare easier to the output marshaled, err := json.Marshal(write) if err != nil { panic(err) @@ -47,12 +46,14 @@ func ApplyWritesToOffChainStore(data LedgerUpdate) { if err := f.Close(); err != nil { panic(err) } + + return nil } func simulateFailureIfRequired() error { if SimulatedFailureCount > 0 && transactionCount >= SimulatedFailureCount { transactionCount = 0 - return errors.New("simulated write failure") + return ErrExpected } transactionCount += 1 diff --git a/off_chain_data/application-go/store/model.go b/off_chain_data/application-go/store/model.go index 1a6e8f33..a68efbea 100644 --- a/off_chain_data/application-go/store/model.go +++ b/off_chain_data/application-go/store/model.go @@ -1,7 +1,7 @@ package store // Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance. -type Writer = func(data LedgerUpdate) +type Writer = func(data LedgerUpdate) error // Ledger update made by a specific transaction. type LedgerUpdate struct {