diff --git a/off_chain_data/application-go/listen.go b/off_chain_data/application-go/listen.go index 34f6f0ef..daa9515d 100644 --- a/off_chain_data/application-go/listen.go +++ b/off_chain_data/application-go/listen.go @@ -4,6 +4,8 @@ import ( "context" "fmt" "math" + "offChainData/parser" + "slices" "strconv" "github.com/hyperledger/fabric-gateway/pkg/client" @@ -12,6 +14,34 @@ import ( var checkpointFile = envOrDefault("CHECKPOINT_FILE", "checkpoint.json") var simulatedFailureCount = getSimulatedFailureCount() +// Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance. +type store = func(data ledgerUpdate) + +// Ledger update made by a specific transaction. +type ledgerUpdate struct { + blockNumber uint64 + transactionId string + writes []write +} + +// Description of a ledger write that can be applied to an off-chain data store. +type write struct { + // Channel whose ledger is being updated. + channelName string + // Namespace within the ledger. + namespace string + // Key name within the ledger namespace. + key string + // Whether the key and associated value are being deleted. + isDelete bool + // If `isDelete` is false, the value written to the key; otherwise ignored. + value []byte +} + +// Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance. +// This implementation just writes to a file. +var applyWritesToOffChainStore = func(data ledgerUpdate) { +} func listen(clientConnection *grpc.ClientConn) { id, options := newConnectOptions(clientConnection) @@ -35,6 +65,7 @@ func listen(clientConnection *grpc.ClientConn) { fmt.Printf("Simulating a write failure every %d transactions", simulatedFailureCount) } + // TODO put into infinite loop like in public docs example ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -48,10 +79,103 @@ func listen(clientConnection *grpc.ClientConn) { } for blockProto := range blocks { - checkpointer.CheckpointBlock(blockProto.GetHeader().GetNumber()) + aBlockProcessor := blockProcessor{ + parser.ParseBlock(blockProto), + checkpointer, + applyWritesToOffChainStore, + } + aBlockProcessor.process() } } +type blockProcessor struct { + block *parser.Block + checkpointer *client.FileCheckpointer + storeWrites store +} + +func (b *blockProcessor) process() { + blockNumber := b.block.Number() + + fmt.Println("Received block", blockNumber) + + for _, transaction := range b.validTransactions() { + aTransactionProcessor := transactionProcessor{ + blockNumber, + transaction, + b.storeWrites, + } + aTransactionProcessor.process() + + transactionId := transaction.ChannelHeader().GetTxId() + b.checkpointer.CheckpointTransaction(blockNumber, transactionId) + } + + b.checkpointer.CheckpointBlock(b.block.Number()) +} + +func (b blockProcessor) validTransactions() []*parser.Transaction { + result := []*parser.Transaction{} + for _, transaction := range b.getNewTransactions() { + if transaction.IsValid() { + result = append(result, transaction) + } + } + return result +} + +func (b *blockProcessor) getNewTransactions() []*parser.Transaction { + transactions := b.block.Transactions() + + lastTransactionId := b.checkpointer.TransactionID() + if lastTransactionId == "" { + // No previously processed transactions within this block so all are new + return transactions + } + + // Ignore transactions up to the last processed transaction ID + lastProcessedIndex := b.findLastProcessedIndex() + return transactions[lastProcessedIndex+1:] +} + +func (b blockProcessor) findLastProcessedIndex() int { + blockTransactionIds := []string{} + for _, transaction := range b.block.Transactions() { + blockTransactionIds = append(blockTransactionIds, transaction.ChannelHeader().GetTxId()) + } + + lastTransactionId := b.checkpointer.TransactionID() + lastProcessedIndex := -1 + for index, id := range blockTransactionIds { + if id == lastTransactionId { + lastProcessedIndex = index + } + } + if lastProcessedIndex < 0 { + panic( + fmt.Errorf( + "checkpoint transaction ID %s not found in block %d containing transactions: %s", + lastTransactionId, + b.block.Number(), + joinByComma(blockTransactionIds), + ), + ) + } + return lastProcessedIndex +} + +func joinByComma(list []string) string { + result := "" + for index, item := range list { + if len(list)-1 == index { + result += item + } else { + result += item + ", " + } + } + return result +} + func getSimulatedFailureCount() uint { valueAsString := envOrDefault("SIMULATED_FAILURE_COUNT", "0") valueAsFloat, err := strconv.ParseFloat(valueAsString, 64) @@ -66,3 +190,70 @@ func getSimulatedFailureCount() uint { return uint(result) } + +type transactionProcessor struct { + blockNumber uint64 + transaction *parser.Transaction + storeWrites store +} + +func (t *transactionProcessor) process() { + transactionId := t.transaction.ChannelHeader().GetTxId() + + fmt.Println("Process transaction", transactionId) + + writes := t.writes() + if len(writes) == 0 { + fmt.Println("Skipping read-only or system transaction", transactionId) + return + } + + fmt.Println("Process transaction", transactionId) + + t.storeWrites(ledgerUpdate{ + t.blockNumber, + transactionId, + writes, + }) +} + +func (t *transactionProcessor) writes() []write { + channelName = t.transaction.ChannelHeader().GetChannelId() + + nonSystemCCReadWriteSets := []parser.NamespaceReadWriteSet{} + for _, nsReadWriteSet := range t.transaction.NamespaceReadWriteSets() { + if !t.isSystemChaincode(nsReadWriteSet.Namespace()) { + nonSystemCCReadWriteSets = append(nonSystemCCReadWriteSets, nsReadWriteSet) + } + } + + writes := []write{} + for _, readWriteSet := range nonSystemCCReadWriteSets { + namespace := readWriteSet.Namespace() + + for _, kvWrite := range readWriteSet.ReadWriteSet().GetWrites() { + aWrite := write{ + channelName, + namespace, + kvWrite.GetKey(), + kvWrite.GetIsDelete(), + kvWrite.GetValue(), + } + writes = append(writes, aWrite) + } + } + + return writes +} + +func (t *transactionProcessor) isSystemChaincode(chaincodeName string) bool { + systemChaincodeNames := []string{ + "_lifecycle", + "cscc", + "escc", + "lscc", + "qscc", + "vscc", + } + return slices.Contains(systemChaincodeNames, chaincodeName) +} diff --git a/off_chain_data/application-go/parser/blockParser.go b/off_chain_data/application-go/parser/blockParser.go index 5e3ab56e..b9539ea3 100644 --- a/off_chain_data/application-go/parser/blockParser.go +++ b/off_chain_data/application-go/parser/blockParser.go @@ -22,28 +22,30 @@ func (b *Block) Number() uint64 { return header.GetNumber() } -// TODO: needs cache; decompose +// TODO: needs cache func (b *Block) Transactions() []*Transaction { - envelopes := []*common.Envelope{} - for _, blockData := range b.block.GetData().GetData() { - envelope := &common.Envelope{} - if err := proto.Unmarshal(blockData, envelope); err != nil { - panic(err) - } - envelopes = append(envelopes, envelope) - } + envelopes := b.unmarshalEnvelopesFromBlockData() - commonPayloads := []*common.Payload{} - for _, envelope := range envelopes { - commonPayload := &common.Payload{} - if err := proto.Unmarshal(envelope.GetPayload(), commonPayload); err != nil { - panic(err) - } - commonPayloads = append(commonPayloads, commonPayload) - } + commonPayloads := b.unmarshalPayloadsFrom(envelopes) + payloads := b.parse(commonPayloads) + + result := b.createTransactionsFrom(payloads) + + return result +} + +func (*Block) createTransactionsFrom(payloads []*PayloadImpl) []*Transaction { + result := []*Transaction{} + for _, payload := range payloads { + result = append(result, NewTransaction(payload)) + } + return result +} + +func (b *Block) parse(commonPayloads []*common.Payload) []*PayloadImpl { validationCodes := b.extractTransactionValidationCodes() - payloads := []*PayloadImpl{} + result := []*PayloadImpl{} for i, commonPayload := range commonPayloads { payload := ParsePayload( commonPayload, @@ -54,15 +56,33 @@ func (b *Block) Transactions() []*Transaction { ), ) if payload.IsEndorserTransaction() { - payloads = append(payloads, payload) + result = append(result, payload) } } + return result +} - result := []*Transaction{} - for _, payload := range payloads { - result = append(result, NewTransaction(payload)) +func (*Block) unmarshalPayloadsFrom(envelopes []*common.Envelope) []*common.Payload { + result := []*common.Payload{} + for _, envelope := range envelopes { + commonPayload := &common.Payload{} + if err := proto.Unmarshal(envelope.GetPayload(), commonPayload); err != nil { + panic(err) + } + result = append(result, commonPayload) } + return result +} +func (b *Block) unmarshalEnvelopesFromBlockData() []*common.Envelope { + result := []*common.Envelope{} + for _, blockData := range b.block.GetData().GetData() { + envelope := &common.Envelope{} + if err := proto.Unmarshal(blockData, envelope); err != nil { + panic(err) + } + result = append(result, envelope) + } return result }