Implement block and transaction processor

Added block processor struct and the process method.
Implemented getting valid transactions from the last processed index.
Added data structures needed for the store.

Decomposed the parser.Block.Transactions() method into readable chunks.

Added transaction processor struct and process method. Unwrapping
read write set data from the transaction, mapping to a new "write"
data structure and passing down to the store.

Store is an empty function and will be implemented next.

Signed-off-by: Stanislav Jakuschevskij <stas@two-giants.com>
This commit is contained in:
Stanislav Jakuschevskij 2024-12-09 19:13:27 +01:00
parent 8ae2909b9b
commit bd0c03356b
No known key found for this signature in database
GPG key ID: 78195D2E6998E2EB
2 changed files with 234 additions and 23 deletions

View file

@ -4,6 +4,8 @@ import (
"context"
"fmt"
"math"
"offChainData/parser"
"slices"
"strconv"
"github.com/hyperledger/fabric-gateway/pkg/client"
@ -12,6 +14,34 @@ import (
var checkpointFile = envOrDefault("CHECKPOINT_FILE", "checkpoint.json")
var simulatedFailureCount = getSimulatedFailureCount()
// Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance.
type store = func(data ledgerUpdate)
// Ledger update made by a specific transaction.
type ledgerUpdate struct {
blockNumber uint64
transactionId string
writes []write
}
// Description of a ledger write that can be applied to an off-chain data store.
type write struct {
// Channel whose ledger is being updated.
channelName string
// Namespace within the ledger.
namespace string
// Key name within the ledger namespace.
key string
// Whether the key and associated value are being deleted.
isDelete bool
// If `isDelete` is false, the value written to the key; otherwise ignored.
value []byte
}
// Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance.
// This implementation just writes to a file.
var applyWritesToOffChainStore = func(data ledgerUpdate) {
}
func listen(clientConnection *grpc.ClientConn) {
id, options := newConnectOptions(clientConnection)
@ -35,6 +65,7 @@ func listen(clientConnection *grpc.ClientConn) {
fmt.Printf("Simulating a write failure every %d transactions", simulatedFailureCount)
}
// TODO put into infinite loop like in public docs example
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -48,10 +79,103 @@ func listen(clientConnection *grpc.ClientConn) {
}
for blockProto := range blocks {
checkpointer.CheckpointBlock(blockProto.GetHeader().GetNumber())
aBlockProcessor := blockProcessor{
parser.ParseBlock(blockProto),
checkpointer,
applyWritesToOffChainStore,
}
aBlockProcessor.process()
}
}
type blockProcessor struct {
block *parser.Block
checkpointer *client.FileCheckpointer
storeWrites store
}
func (b *blockProcessor) process() {
blockNumber := b.block.Number()
fmt.Println("Received block", blockNumber)
for _, transaction := range b.validTransactions() {
aTransactionProcessor := transactionProcessor{
blockNumber,
transaction,
b.storeWrites,
}
aTransactionProcessor.process()
transactionId := transaction.ChannelHeader().GetTxId()
b.checkpointer.CheckpointTransaction(blockNumber, transactionId)
}
b.checkpointer.CheckpointBlock(b.block.Number())
}
func (b blockProcessor) validTransactions() []*parser.Transaction {
result := []*parser.Transaction{}
for _, transaction := range b.getNewTransactions() {
if transaction.IsValid() {
result = append(result, transaction)
}
}
return result
}
func (b *blockProcessor) getNewTransactions() []*parser.Transaction {
transactions := b.block.Transactions()
lastTransactionId := b.checkpointer.TransactionID()
if lastTransactionId == "" {
// No previously processed transactions within this block so all are new
return transactions
}
// Ignore transactions up to the last processed transaction ID
lastProcessedIndex := b.findLastProcessedIndex()
return transactions[lastProcessedIndex+1:]
}
func (b blockProcessor) findLastProcessedIndex() int {
blockTransactionIds := []string{}
for _, transaction := range b.block.Transactions() {
blockTransactionIds = append(blockTransactionIds, transaction.ChannelHeader().GetTxId())
}
lastTransactionId := b.checkpointer.TransactionID()
lastProcessedIndex := -1
for index, id := range blockTransactionIds {
if id == lastTransactionId {
lastProcessedIndex = index
}
}
if lastProcessedIndex < 0 {
panic(
fmt.Errorf(
"checkpoint transaction ID %s not found in block %d containing transactions: %s",
lastTransactionId,
b.block.Number(),
joinByComma(blockTransactionIds),
),
)
}
return lastProcessedIndex
}
func joinByComma(list []string) string {
result := ""
for index, item := range list {
if len(list)-1 == index {
result += item
} else {
result += item + ", "
}
}
return result
}
func getSimulatedFailureCount() uint {
valueAsString := envOrDefault("SIMULATED_FAILURE_COUNT", "0")
valueAsFloat, err := strconv.ParseFloat(valueAsString, 64)
@ -66,3 +190,70 @@ func getSimulatedFailureCount() uint {
return uint(result)
}
type transactionProcessor struct {
blockNumber uint64
transaction *parser.Transaction
storeWrites store
}
func (t *transactionProcessor) process() {
transactionId := t.transaction.ChannelHeader().GetTxId()
fmt.Println("Process transaction", transactionId)
writes := t.writes()
if len(writes) == 0 {
fmt.Println("Skipping read-only or system transaction", transactionId)
return
}
fmt.Println("Process transaction", transactionId)
t.storeWrites(ledgerUpdate{
t.blockNumber,
transactionId,
writes,
})
}
func (t *transactionProcessor) writes() []write {
channelName = t.transaction.ChannelHeader().GetChannelId()
nonSystemCCReadWriteSets := []parser.NamespaceReadWriteSet{}
for _, nsReadWriteSet := range t.transaction.NamespaceReadWriteSets() {
if !t.isSystemChaincode(nsReadWriteSet.Namespace()) {
nonSystemCCReadWriteSets = append(nonSystemCCReadWriteSets, nsReadWriteSet)
}
}
writes := []write{}
for _, readWriteSet := range nonSystemCCReadWriteSets {
namespace := readWriteSet.Namespace()
for _, kvWrite := range readWriteSet.ReadWriteSet().GetWrites() {
aWrite := write{
channelName,
namespace,
kvWrite.GetKey(),
kvWrite.GetIsDelete(),
kvWrite.GetValue(),
}
writes = append(writes, aWrite)
}
}
return writes
}
func (t *transactionProcessor) isSystemChaincode(chaincodeName string) bool {
systemChaincodeNames := []string{
"_lifecycle",
"cscc",
"escc",
"lscc",
"qscc",
"vscc",
}
return slices.Contains(systemChaincodeNames, chaincodeName)
}

View file

@ -22,28 +22,30 @@ func (b *Block) Number() uint64 {
return header.GetNumber()
}
// TODO: needs cache; decompose
// TODO: needs cache
func (b *Block) Transactions() []*Transaction {
envelopes := []*common.Envelope{}
for _, blockData := range b.block.GetData().GetData() {
envelope := &common.Envelope{}
if err := proto.Unmarshal(blockData, envelope); err != nil {
panic(err)
}
envelopes = append(envelopes, envelope)
}
envelopes := b.unmarshalEnvelopesFromBlockData()
commonPayloads := []*common.Payload{}
for _, envelope := range envelopes {
commonPayload := &common.Payload{}
if err := proto.Unmarshal(envelope.GetPayload(), commonPayload); err != nil {
panic(err)
}
commonPayloads = append(commonPayloads, commonPayload)
}
commonPayloads := b.unmarshalPayloadsFrom(envelopes)
payloads := b.parse(commonPayloads)
result := b.createTransactionsFrom(payloads)
return result
}
func (*Block) createTransactionsFrom(payloads []*PayloadImpl) []*Transaction {
result := []*Transaction{}
for _, payload := range payloads {
result = append(result, NewTransaction(payload))
}
return result
}
func (b *Block) parse(commonPayloads []*common.Payload) []*PayloadImpl {
validationCodes := b.extractTransactionValidationCodes()
payloads := []*PayloadImpl{}
result := []*PayloadImpl{}
for i, commonPayload := range commonPayloads {
payload := ParsePayload(
commonPayload,
@ -54,15 +56,33 @@ func (b *Block) Transactions() []*Transaction {
),
)
if payload.IsEndorserTransaction() {
payloads = append(payloads, payload)
result = append(result, payload)
}
}
return result
}
result := []*Transaction{}
for _, payload := range payloads {
result = append(result, NewTransaction(payload))
func (*Block) unmarshalPayloadsFrom(envelopes []*common.Envelope) []*common.Payload {
result := []*common.Payload{}
for _, envelope := range envelopes {
commonPayload := &common.Payload{}
if err := proto.Unmarshal(envelope.GetPayload(), commonPayload); err != nil {
panic(err)
}
result = append(result, commonPayload)
}
return result
}
func (b *Block) unmarshalEnvelopesFromBlockData() []*common.Envelope {
result := []*common.Envelope{}
for _, blockData := range b.block.GetData().GetData() {
envelope := &common.Envelope{}
if err := proto.Unmarshal(blockData, envelope); err != nil {
panic(err)
}
result = append(result, envelope)
}
return result
}