Extract block processor and store from listener

Created packages for the flat file store and the processor and moved
functions, variables and constants from listener.go to those packages.
Encapsulated everything not used outside the packages, introduced
model.go files which later might be extracted into a model package and
renamed parser/parsedBlock.go to parser/block.go.

Signed-off-by: Stanislav Jakuschevskij <stas@two-giants.com>
This commit is contained in:
Stanislav Jakuschevskij 2024-12-31 12:51:52 +01:00
parent c4db079e0c
commit b04bda5b11
No known key found for this signature in database
GPG key ID: 78195D2E6998E2EB
11 changed files with 320 additions and 275 deletions

View file

@ -9,6 +9,7 @@ package main
import (
"crypto/x509"
"fmt"
"offChainData/utils"
"os"
"path"
"time"
@ -23,37 +24,29 @@ import (
const peerName = "peer0.org1.example.com"
var (
channelName = envOrDefault("CHANNEL_NAME", "mychannel")
chaincodeName = envOrDefault("CHAINCODE_NAME", "basic")
mspID = envOrDefault("MSP_ID", "Org1MSP")
channelName = utils.EnvOrDefault("CHANNEL_NAME", "mychannel")
chaincodeName = utils.EnvOrDefault("CHAINCODE_NAME", "basic")
mspID = utils.EnvOrDefault("MSP_ID", "Org1MSP")
// Path to crypto materials.
cryptoPath = envOrDefault("CRYPTO_PATH", "../../test-network/organizations/peerOrganizations/org1.example.com")
cryptoPath = utils.EnvOrDefault("CRYPTO_PATH", "../../test-network/organizations/peerOrganizations/org1.example.com")
// Path to user private key directory.
keyDirectoryPath = envOrDefault("KEY_DIRECTORY_PATH", cryptoPath+"/users/User1@org1.example.com/msp/keystore")
keyDirectoryPath = utils.EnvOrDefault("KEY_DIRECTORY_PATH", cryptoPath+"/users/User1@org1.example.com/msp/keystore")
// Path to user certificate.
certPath = envOrDefault("CERT_PATH", cryptoPath+"/users/User1@org1.example.com/msp/signcerts/cert.pem")
certPath = utils.EnvOrDefault("CERT_PATH", cryptoPath+"/users/User1@org1.example.com/msp/signcerts/cert.pem")
// Path to peer tls certificate.
tlsCertPath = envOrDefault("TLS_CERT_PATH", cryptoPath+"/peers/peer0.org1.example.com/tls/ca.crt")
tlsCertPath = utils.EnvOrDefault("TLS_CERT_PATH", cryptoPath+"/peers/peer0.org1.example.com/tls/ca.crt")
// Gateway peer endpoint.
peerEndpoint = envOrDefault("PEER_ENDPOINT", "dns:///localhost:7051")
peerEndpoint = utils.EnvOrDefault("PEER_ENDPOINT", "dns:///localhost:7051")
// Gateway peer SSL host name override.
peerHostAlias = envOrDefault("PEER_HOST_ALIAS", peerName)
peerHostAlias = utils.EnvOrDefault("PEER_HOST_ALIAS", peerName)
)
func envOrDefault(key, defaultValue string) string {
result := os.Getenv(key)
if result == "" {
return defaultValue
}
return result
}
func newGrpcConnection() *grpc.ClientConn {
certificatePEM, err := os.ReadFile(tlsCertPath)
if err != nil {

View file

@ -2,96 +2,16 @@ package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"math"
"offChainData/parser"
"os"
"slices"
"strconv"
"strings"
"offChainData/processor"
"offChainData/store"
"offChainData/utils"
"github.com/hyperledger/fabric-gateway/pkg/client"
"google.golang.org/grpc"
)
var checkpointFile = envOrDefault("CHECKPOINT_FILE", "checkpoint.json")
var storeFile = envOrDefault("STORE_FILE", "store.log")
var simulatedFailureCount = getSimulatedFailureCount()
const startBlock = 0
var transactionCount uint = 0 // Used only to simulate failures
// Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance.
type store = func(data ledgerUpdate)
// Ledger update made by a specific transaction.
type ledgerUpdate struct {
blockNumber uint64
transactionId string
writes []write
}
// Description of a ledger write that can be applied to an off-chain data store.
type write struct {
// Channel whose ledger is being updated.
ChannelName string `json:"channelName"`
// Namespace within the ledger.
Namespace string `json:"namespace"`
// Key name within the ledger namespace.
Key string `json:"key"`
// Whether the key and associated value are being deleted.
IsDelete bool `json:"isDelete"`
// If `isDelete` is false, the Value written to the key; otherwise ignored.
Value string `json:"value"`
}
// Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance.
// This implementation just writes to a file.
var applyWritesToOffChainStore = func(data ledgerUpdate) {
if err := simulateFailureIfRequired(); err != nil {
fmt.Println("[expected error]: " + err.Error())
return
}
writes := []string{}
for _, write := range data.writes {
marshaled, err := json.Marshal(write)
if err != nil {
panic(err)
}
writes = append(writes, string(marshaled))
}
f, err := os.OpenFile(storeFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
panic(err)
}
if _, err := f.Write([]byte(strings.Join(writes, "\n") + "\n")); err != nil {
f.Close()
panic(err)
}
if err := f.Close(); err != nil {
panic(err)
}
}
func simulateFailureIfRequired() error {
if simulatedFailureCount > 0 && transactionCount >= simulatedFailureCount {
transactionCount = 0
return errors.New("simulated write failure")
}
transactionCount += 1
return nil
}
func listen(clientConnection *grpc.ClientConn) {
id, options := newConnectOptions(clientConnection)
gateway, err := client.Connect(id, options...)
@ -100,8 +20,7 @@ func listen(clientConnection *grpc.ClientConn) {
}
defer gateway.Close()
network := gateway.GetNetwork(channelName)
checkpointFile := utils.EnvOrDefault("CHECKPOINT_FILE", "checkpoint.json")
checkpointer, err := client.NewFileCheckpointer(checkpointFile)
if err != nil {
panic(err)
@ -110,196 +29,31 @@ func listen(clientConnection *grpc.ClientConn) {
fmt.Println("Start event listening from block", checkpointer.BlockNumber())
fmt.Println("Last processed transaction ID within block:", checkpointer.TransactionID())
if simulatedFailureCount > 0 {
fmt.Printf("Simulating a write failure every %d transactions\n", simulatedFailureCount)
if store.SimulatedFailureCount > 0 {
fmt.Printf("Simulating a write failure every %d transactions\n", store.SimulatedFailureCount)
}
// TODO put into infinite loop like in public docs example
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
network := gateway.GetNetwork(channelName)
blocks, err := network.BlockEvents(
ctx,
client.WithCheckpoint(checkpointer),
client.WithStartBlock(startBlock), // Used only if there is no checkpoint block number
client.WithStartBlock(0), // Used only if there is no checkpoint block number
)
if err != nil {
panic(err)
}
for blockProto := range blocks {
aBlockProcessor := blockProcessor{
blockProcessor := processor.NewBlock(
parser.ParseBlock(blockProto),
checkpointer,
applyWritesToOffChainStore,
}
aBlockProcessor.process()
}
}
type blockProcessor struct {
block *parser.Block
checkpointer *client.FileCheckpointer
storeWrites store
}
func (b *blockProcessor) process() {
blockNumber := b.block.Number()
fmt.Println("Received block", blockNumber)
for _, transaction := range b.validTransactions() {
aTransactionProcessor := transactionProcessor{
blockNumber,
transaction,
b.storeWrites,
}
aTransactionProcessor.process()
transactionId := transaction.ChannelHeader().GetTxId()
b.checkpointer.CheckpointTransaction(blockNumber, transactionId)
}
b.checkpointer.CheckpointBlock(b.block.Number())
}
func (b blockProcessor) validTransactions() []*parser.Transaction {
result := []*parser.Transaction{}
for _, transaction := range b.getNewTransactions() {
if transaction.IsValid() {
result = append(result, transaction)
}
}
return result
}
func (b *blockProcessor) getNewTransactions() []*parser.Transaction {
transactions := b.block.Transactions()
lastTransactionId := b.checkpointer.TransactionID()
if lastTransactionId == "" {
// No previously processed transactions within this block so all are new
return transactions
}
// Ignore transactions up to the last processed transaction ID
lastProcessedIndex := b.findLastProcessedIndex()
return transactions[lastProcessedIndex+1:]
}
func (b blockProcessor) findLastProcessedIndex() int {
blockTransactionIds := []string{}
for _, transaction := range b.block.Transactions() {
blockTransactionIds = append(blockTransactionIds, transaction.ChannelHeader().GetTxId())
}
lastTransactionId := b.checkpointer.TransactionID()
lastProcessedIndex := -1
for index, id := range blockTransactionIds {
if id == lastTransactionId {
lastProcessedIndex = index
}
}
if lastProcessedIndex < 0 {
panic(
fmt.Errorf(
"checkpoint transaction ID %s not found in block %d containing transactions: %s",
lastTransactionId,
b.block.Number(),
joinByComma(blockTransactionIds),
),
store.ApplyWritesToOffChainStore,
channelName,
)
blockProcessor.Process()
}
return lastProcessedIndex
}
func joinByComma(list []string) string {
result := ""
for index, item := range list {
if len(list)-1 == index {
result += item
} else {
result += item + ", "
}
}
return result
}
func getSimulatedFailureCount() uint {
valueAsString := envOrDefault("SIMULATED_FAILURE_COUNT", "0")
valueAsFloat, err := strconv.ParseFloat(valueAsString, 64)
if err != nil {
panic(err)
}
result := math.Floor(valueAsFloat)
if valueAsFloat < 0 {
panic(fmt.Errorf("invalid SIMULATED_FAILURE_COUNT value: %s", valueAsString))
}
return uint(result)
}
type transactionProcessor struct {
blockNumber uint64
transaction *parser.Transaction
storeWrites store
}
func (t *transactionProcessor) process() {
transactionId := t.transaction.ChannelHeader().GetTxId()
writes := t.writes()
if len(writes) == 0 {
fmt.Println("Skipping read-only or system transaction", transactionId)
return
}
fmt.Println("Process transaction", transactionId)
t.storeWrites(ledgerUpdate{
t.blockNumber,
transactionId,
writes,
})
}
func (t *transactionProcessor) writes() []write {
channelName = t.transaction.ChannelHeader().GetChannelId()
nonSystemCCReadWriteSets := []parser.NamespaceReadWriteSet{}
for _, nsReadWriteSet := range t.transaction.NamespaceReadWriteSets() {
if !t.isSystemChaincode(nsReadWriteSet.Namespace()) {
nonSystemCCReadWriteSets = append(nonSystemCCReadWriteSets, nsReadWriteSet)
}
}
writes := []write{}
for _, readWriteSet := range nonSystemCCReadWriteSets {
namespace := readWriteSet.Namespace()
for _, kvWrite := range readWriteSet.ReadWriteSet().GetWrites() {
writes = append(writes, write{
channelName,
namespace,
kvWrite.GetKey(),
kvWrite.GetIsDelete(),
string(kvWrite.GetValue()), // Convert bytes to text, purely for readability in output
})
}
}
return writes
}
func (t *transactionProcessor) isSystemChaincode(chaincodeName string) bool {
systemChaincodeNames := []string{
"_lifecycle",
"cscc",
"escc",
"lscc",
"qscc",
"vscc",
}
return slices.Contains(systemChaincodeNames, chaincodeName)
}

View file

@ -0,0 +1,114 @@
package processor
import (
"fmt"
"offChainData/parser"
"offChainData/store"
"github.com/hyperledger/fabric-gateway/pkg/client"
)
type block struct {
block *parser.Block
checkpointer *client.FileCheckpointer
writeToStore store.Writer
channelName string
}
func NewBlock(
_block *parser.Block,
checkpointer *client.FileCheckpointer,
writeToStore store.Writer,
channelName string,
) *block {
return &block{
_block,
checkpointer,
writeToStore,
channelName,
}
}
func (b *block) Process() {
blockNumber := b.block.Number()
fmt.Println("\nReceived block", blockNumber)
for _, transaction := range b.validTransactions() {
aTransactionProcessor := transactionProcessor{
blockNumber,
transaction,
// TODO use pointer to parent and get blockNumber, store and channelName from parent
b.writeToStore,
b.channelName,
}
aTransactionProcessor.process()
transactionId := transaction.ChannelHeader().GetTxId()
b.checkpointer.CheckpointTransaction(blockNumber, transactionId)
}
b.checkpointer.CheckpointBlock(b.block.Number())
}
func (b *block) validTransactions() []*parser.Transaction {
result := []*parser.Transaction{}
for _, transaction := range b.getNewTransactions() {
if transaction.IsValid() {
result = append(result, transaction)
}
}
return result
}
func (b *block) getNewTransactions() []*parser.Transaction {
transactions := b.block.Transactions()
lastTransactionId := b.checkpointer.TransactionID()
if lastTransactionId == "" {
// No previously processed transactions within this block so all are new
return transactions
}
// Ignore transactions up to the last processed transaction ID
lastProcessedIndex := b.findLastProcessedIndex()
return transactions[lastProcessedIndex+1:]
}
func (b *block) findLastProcessedIndex() int {
blockTransactionIds := []string{}
for _, transaction := range b.block.Transactions() {
blockTransactionIds = append(blockTransactionIds, transaction.ChannelHeader().GetTxId())
}
lastTransactionId := b.checkpointer.TransactionID()
lastProcessedIndex := -1
for index, id := range blockTransactionIds {
if id == lastTransactionId {
lastProcessedIndex = index
}
}
if lastProcessedIndex < 0 {
panic(
fmt.Errorf(
"checkpoint transaction ID %s not found in block %d containing transactions: %s",
lastTransactionId,
b.block.Number(),
b.joinByComma(blockTransactionIds),
),
)
}
return lastProcessedIndex
}
func (b *block) joinByComma(list []string) string {
result := ""
for index, item := range list {
if len(list)-1 == index {
result += item
} else {
result += item + ", "
}
}
return result
}

View file

@ -0,0 +1,73 @@
package processor
import (
"fmt"
"offChainData/parser"
"offChainData/store"
"slices"
)
type transactionProcessor struct {
blockNumber uint64
transaction *parser.Transaction
writeToStore store.Writer
channelName string
}
func (t *transactionProcessor) process() {
transactionId := t.transaction.ChannelHeader().GetTxId()
writes := t.writes()
if len(writes) == 0 {
fmt.Println("Skipping read-only or system transaction", transactionId)
return
}
fmt.Println("Process transaction", transactionId)
t.writeToStore(store.LedgerUpdate{
BlockNumber: t.blockNumber,
TransactionId: transactionId,
Writes: writes,
})
}
func (t *transactionProcessor) writes() []store.Write {
t.channelName = t.transaction.ChannelHeader().GetChannelId()
nonSystemCCReadWriteSets := []parser.NamespaceReadWriteSet{}
for _, nsReadWriteSet := range t.transaction.NamespaceReadWriteSets() {
if !t.isSystemChaincode(nsReadWriteSet.Namespace()) {
nonSystemCCReadWriteSets = append(nonSystemCCReadWriteSets, nsReadWriteSet)
}
}
writes := []store.Write{}
for _, readWriteSet := range nonSystemCCReadWriteSets {
namespace := readWriteSet.Namespace()
for _, kvWrite := range readWriteSet.ReadWriteSet().GetWrites() {
writes = append(writes, store.Write{
ChannelName: t.channelName,
Namespace: namespace,
Key: kvWrite.GetKey(),
IsDelete: kvWrite.GetIsDelete(),
Value: string(kvWrite.GetValue()), // Convert bytes to text, purely for readability in output
})
}
}
return writes
}
func (t *transactionProcessor) isSystemChaincode(chaincodeName string) bool {
systemChaincodeNames := []string{
"_lifecycle",
"cscc",
"escc",
"lscc",
"qscc",
"vscc",
}
return slices.Contains(systemChaincodeNames, chaincodeName)
}

View file

@ -0,0 +1,76 @@
package store
import (
"encoding/json"
"errors"
"fmt"
"math"
"offChainData/utils"
"os"
"strconv"
"strings"
)
var storeFile = utils.EnvOrDefault("STORE_FILE", "store.log")
var SimulatedFailureCount = getSimulatedFailureCount()
var transactionCount uint = 0 // Used only to simulate failures
// Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance.
// This implementation just writes to a file.
func ApplyWritesToOffChainStore(data LedgerUpdate) {
if err := simulateFailureIfRequired(); err != nil {
fmt.Println("[expected error]: " + err.Error())
return
}
writes := []string{}
for _, write := range data.Writes {
// TODO write also the TxID and block number so that you can compare easier to the output
marshaled, err := json.Marshal(write)
if err != nil {
panic(err)
}
writes = append(writes, string(marshaled))
}
f, err := os.OpenFile(storeFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
panic(err)
}
if _, err := f.Write([]byte(strings.Join(writes, "\n") + "\n")); err != nil {
f.Close()
panic(err)
}
if err := f.Close(); err != nil {
panic(err)
}
}
func simulateFailureIfRequired() error {
if SimulatedFailureCount > 0 && transactionCount >= SimulatedFailureCount {
transactionCount = 0
return errors.New("simulated write failure")
}
transactionCount += 1
return nil
}
func getSimulatedFailureCount() uint {
valueAsString := utils.EnvOrDefault("SIMULATED_FAILURE_COUNT", "0")
valueAsFloat, err := strconv.ParseFloat(valueAsString, 64)
if err != nil {
panic(err)
}
result := math.Floor(valueAsFloat)
if valueAsFloat < 0 {
panic(fmt.Errorf("invalid SIMULATED_FAILURE_COUNT value: %s", valueAsString))
}
return uint(result)
}

View file

@ -0,0 +1,25 @@
package store
// Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance.
type Writer = func(data LedgerUpdate)
// Ledger update made by a specific transaction.
type LedgerUpdate struct {
BlockNumber uint64
TransactionId string
Writes []Write
}
// Description of a ledger Write that can be applied to an off-chain data store.
type Write struct {
// Channel whose ledger is being updated.
ChannelName string `json:"channelName"`
// Namespace within the ledger.
Namespace string `json:"namespace"`
// Key name within the ledger namespace.
Key string `json:"key"`
// Whether the key and associated value are being deleted.
IsDelete bool `json:"isDelete"`
// If `isDelete` is false, the Value written to the key; otherwise ignored.
Value string `json:"value"`
}

View file

@ -44,6 +44,7 @@ func (t *transactApp) transact() {
anAsset := atb.NewAsset()
t.smartContract.CreateAsset(anAsset)
// TODO print txID to compare easier with block processing
fmt.Println("Created asset", anAsset.ID)
// Transfer randomly 1 in 2 assets to a new owner.

View file

@ -4,6 +4,7 @@ import (
"crypto/rand"
"errors"
"math/big"
"os"
)
// Pick a random element from an array.
@ -55,3 +56,11 @@ func Cache[T any](f func() T) func() T {
return value.(T)
}
}
func EnvOrDefault(key, defaultValue string) string {
result := os.Getenv(key)
if result == "" {
return defaultValue
}
return result
}