Refactor store

Add store interface and implement with offChainStore struct. Decompose
storing into small chunks and keep state around storing writes and
failure count.

Move environment variables used for store setup into the setup phase of
the listen function.

Signed-off-by: Stanislav Jakuschevskij <stas@two-giants.com>
This commit is contained in:
Stanislav Jakuschevskij 2025-02-20 19:12:29 +01:00
parent b03b8cc495
commit fde0cd58cc
No known key found for this signature in database
GPG key ID: 78195D2E6998E2EB
2 changed files with 103 additions and 66 deletions

View file

@ -7,6 +7,7 @@ import (
"os"
"os/signal"
"slices"
"strconv"
"strings"
"syscall"
@ -35,12 +36,15 @@ func listen(clientConnection grpc.ClientConnInterface) error {
checkpointer.Close()
fmt.Println("Checkpointer closed.")
}()
fmt.Println("Start event listening from block", checkpointer.BlockNumber())
fmt.Println("Last processed transaction ID within block:", checkpointer.TransactionID())
simulatedFailureCount := initSimulatedFailureCount()
if simulatedFailureCount > 0 {
fmt.Printf("Simulating a write failure every %d transactions\n", simulatedFailureCount)
}
storeFile := envOrDefault("STORE_FILE", "store.log")
offChainStore := newOffChainStore(storeFile, simulatedFailureCount)
ctx, close := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer func() {
@ -65,7 +69,7 @@ func listen(clientConnection grpc.ClientConnInterface) error {
aBlockProcessor := blockProcessor{
parser.ParseBlock(blockProto),
checkpointer,
applyWritesToOffChainStore,
offChainStore,
}
if err := aBlockProcessor.process(); err != nil {
@ -77,10 +81,46 @@ func listen(clientConnection grpc.ClientConnInterface) error {
return nil
}
func initSimulatedFailureCount() uint {
valueAsString := envOrDefault("SIMULATED_FAILURE_COUNT", "0")
result, err := strconv.ParseUint(valueAsString, 10, 0)
if err != nil {
panic(fmt.Errorf("invalid SIMULATED_FAILURE_COUNT value: %s", valueAsString))
}
return uint(result)
}
// Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance.
type store interface {
write(ledgerUpdate) error
}
// Ledger update made by a specific transaction.
type ledgerUpdate struct {
BlockNumber uint64
TransactionID string
Writes []write
}
// Description of a ledger Write that can be applied to an off-chain data store.
type write struct {
// Channel whose ledger is being updated.
ChannelName string `json:"channelName"`
// Namespace within the ledger.
Namespace string `json:"namespace"`
// Key name within the ledger namespace.
Key string `json:"key"`
// Whether the key and associated value are being deleted.
IsDelete bool `json:"isDelete"`
// If `isDelete` is false, the Value written to the key; otherwise ignored.
Value string `json:"value"`
}
type blockProcessor struct {
parsedBlock *parser.Block
checkpointer *client.FileCheckpointer
writeToStore writer
store store
}
func (b *blockProcessor) process() error {
@ -95,7 +135,7 @@ func (b *blockProcessor) process() error {
txProcessor := transactionProcessor{
b.parsedBlock.Number(),
validTransaction,
b.writeToStore,
b.store,
}
if err := txProcessor.process(); err != nil {
return err
@ -182,9 +222,9 @@ func (b *blockProcessor) findLastProcessedIndex() (int, error) {
}
type transactionProcessor struct {
blockNumber uint64
transaction *parser.Transaction
writeToStore writer
blockNumber uint64
transaction *parser.Transaction
store store
}
func (t *transactionProcessor) process() error {
@ -201,8 +241,7 @@ func (t *transactionProcessor) process() error {
}
fmt.Println("Process transaction", transactionID)
if err := t.writeToStore(ledgerUpdate{
if err := t.store.write(ledgerUpdate{
BlockNumber: t.blockNumber,
TransactionID: transactionID,
Writes: writes,

View file

@ -5,63 +5,84 @@ import (
"errors"
"fmt"
"os"
"strconv"
"strings"
)
var storeFile = envOrDefault("STORE_FILE", "store.log")
var simulatedFailureCount = getSimulatedFailureCount()
var transactionCount uint = 0 // Used only to simulate failures
var errExpected = errors.New("expected error: simulated write failure")
// Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance.
type writer = func(ledgerUpdate) error
// Ledger update made by a specific transaction.
type ledgerUpdate struct {
BlockNumber uint64
TransactionID string
Writes []write
type offChainStore struct {
writes, path string
simulatedFailureCount, transactionCount uint
}
// Description of a ledger Write that can be applied to an off-chain data store.
type write struct {
// Channel whose ledger is being updated.
ChannelName string `json:"channelName"`
// Namespace within the ledger.
Namespace string `json:"namespace"`
// Key name within the ledger namespace.
Key string `json:"key"`
// Whether the key and associated value are being deleted.
IsDelete bool `json:"isDelete"`
// If `isDelete` is false, the Value written to the key; otherwise ignored.
Value string `json:"value"`
func newOffChainStore(path string, simulatedFailureCount uint) *offChainStore {
return &offChainStore{
"",
path,
uint(simulatedFailureCount),
0,
}
}
// Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance.
// This implementation just writes to a file.
func applyWritesToOffChainStore(data ledgerUpdate) error {
if err := simulateFailureIfRequired(); err != nil {
func (ocs *offChainStore) write(data ledgerUpdate) error {
if err := ocs.simulateFailureIfRequired(); err != nil {
return err
}
writes := []string{}
for _, write := range data.Writes {
ocs.clearLastWrites()
if err := ocs.marshal(data.Writes); err != nil {
return err
}
if err := ocs.persist(); err != nil {
return err
}
return nil
}
func (ocs *offChainStore) simulateFailureIfRequired() error {
if ocs.simulatedFailureCount > 0 && ocs.transactionCount >= ocs.simulatedFailureCount {
ocs.transactionCount = 0
return errExpected
}
ocs.transactionCount += 1
return nil
}
func (ocs *offChainStore) clearLastWrites() {
ocs.writes = ""
}
func (ocs *offChainStore) marshal(writes []write) error {
for _, write := range writes {
marshaled, err := json.Marshal(write)
if err != nil {
return err
}
writes = append(writes, string(marshaled))
ocs.writes += string(marshaled) + "\n"
}
f, err := os.OpenFile(storeFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
return nil
}
func (ocs *offChainStore) persist() error {
f, err := os.OpenFile(ocs.path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return err
}
if _, err := f.Write([]byte(strings.Join(writes, "\n") + "\n")); err != nil {
f.Close()
return err
if _, writeErr := f.Write([]byte(ocs.writes)); writeErr != nil {
if closeErr := f.Close(); closeErr != nil {
return fmt.Errorf("write error: %v, close error: %v", writeErr, closeErr)
}
return writeErr
}
if err := f.Close(); err != nil {
@ -70,26 +91,3 @@ func applyWritesToOffChainStore(data ledgerUpdate) error {
return nil
}
var errExpected = errors.New("expected error: simulated write failure")
func simulateFailureIfRequired() error {
if simulatedFailureCount > 0 && transactionCount >= simulatedFailureCount {
transactionCount = 0
return errExpected
}
transactionCount += 1
return nil
}
func getSimulatedFailureCount() uint {
valueAsString := envOrDefault("SIMULATED_FAILURE_COUNT", "0")
result, err := strconv.ParseUint(valueAsString, 10, 0)
if err != nil {
panic(fmt.Errorf("invalid SIMULATED_FAILURE_COUNT value: %s", valueAsString))
}
return uint(result)
}