Add first batch of pull request rework

- update Application section in README
- remove param name in app.go
- add error checks in processor/block.go
- move vars from model to transact logic
- move newAsset to transact
- use ID for well-known initialisms
- move randomelement, randomnint and differentelement to transact
- remove AssertDefined
- blockTxIdsJoinedByComma: use standard library to join elements
- return nil, instead of []byte{}
- remove go routine in listen.go
- move cache to parser
- inline processor in listen.go
- move store to main package
- move util to main package
- fixed failing cache issue
- fixed staticcheck issues
- removed cache function, implemented caching in the structs and methods

Signed-off-by: Stanislav Jakuschevskij <stas@two-giants.com>
This commit is contained in:
Stanislav Jakuschevskij 2025-01-20 15:00:05 +01:00
parent 40e627dcf5
commit fd1a1fc38b
No known key found for this signature in database
GPG key ID: 78195D2E6998E2EB
22 changed files with 454 additions and 694 deletions

View file

@ -19,12 +19,15 @@ The client application provides several "commands" that can be invoked using the
- **getAllAssets**: Retrieve the current details of all assets recorded on the ledger. See:
- TypeScript: [application-typescript/src/getAllAssets.ts](application-typescript/src/getAllAssets.ts)
- Java: [application-java/app/src/main/java/GetAllAssets.java](application-java/app/src/main/java/GetAllAssets.java)
- Go: [application-go/getAllAssets.go](application-go/getAllAssets.go)
- **listen**: Listen for block events, and use them to replicate ledger updates in an off-chain data store. See:
- TypeScript: [application-typescript/src/listen.ts](application-typescript/src/listen.ts)
- Java: [application-java/app/src/main/java/Listen.java](application-java/app/src/main/java/Listen.java)
- Go: [application-go/listen.go](application-go/listen.go)
- **transact**: Submit a set of transactions to create, modify and delete assets. See:
- TypeScript: [application-typescript/src/transact.ts](application-typescript/src/transact.ts)
- Java: [application-java/app/src/main/java/Transact.java](application-java/app/src/main/java/Transact.java)
- Go: [application-go/transact.go](application-go/transact.go)
To keep the sample code concise, the **listen** command writes ledger updates to an output file named `store.log` in the current working directory (which for the Java sample is the `application-java/app` directory). A real implementation could write ledger updates directly to an off-chain data store of choice. You can inspect the information captured in this file as you run the sample.

View file

@ -15,7 +15,7 @@ import (
"google.golang.org/grpc"
)
var allCommands = map[string]func(clientConnection *grpc.ClientConn){
var allCommands = map[string]func(*grpc.ClientConn){
"getAllAssets": getAllAssets,
"transact": transact,
"listen": listen,

View file

@ -9,7 +9,6 @@ package main
import (
"crypto/x509"
"fmt"
"offChainData/utils"
"os"
"path"
"time"
@ -24,27 +23,27 @@ import (
const peerName = "peer0.org1.example.com"
var (
channelName = utils.EnvOrDefault("CHANNEL_NAME", "mychannel")
chaincodeName = utils.EnvOrDefault("CHAINCODE_NAME", "basic")
mspID = utils.EnvOrDefault("MSP_ID", "Org1MSP")
channelName = envOrDefault("CHANNEL_NAME", "mychannel")
chaincodeName = envOrDefault("CHAINCODE_NAME", "basic")
mspID = envOrDefault("MSP_ID", "Org1MSP")
// Path to crypto materials.
cryptoPath = utils.EnvOrDefault("CRYPTO_PATH", "../../test-network/organizations/peerOrganizations/org1.example.com")
cryptoPath = envOrDefault("CRYPTO_PATH", "../../test-network/organizations/peerOrganizations/org1.example.com")
// Path to user private key directory.
keyDirectoryPath = utils.EnvOrDefault("KEY_DIRECTORY_PATH", cryptoPath+"/users/User1@org1.example.com/msp/keystore")
keyDirectoryPath = envOrDefault("KEY_DIRECTORY_PATH", cryptoPath+"/users/User1@org1.example.com/msp/keystore")
// Path to user certificate.
certPath = utils.EnvOrDefault("CERT_PATH", cryptoPath+"/users/User1@org1.example.com/msp/signcerts/cert.pem")
certPath = envOrDefault("CERT_PATH", cryptoPath+"/users/User1@org1.example.com/msp/signcerts/cert.pem")
// Path to peer tls certificate.
tlsCertPath = utils.EnvOrDefault("TLS_CERT_PATH", cryptoPath+"/peers/peer0.org1.example.com/tls/ca.crt")
tlsCertPath = envOrDefault("TLS_CERT_PATH", cryptoPath+"/peers/peer0.org1.example.com/tls/ca.crt")
// Gateway peer endpoint.
peerEndpoint = utils.EnvOrDefault("PEER_ENDPOINT", "dns:///localhost:7051")
peerEndpoint = envOrDefault("PEER_ENDPOINT", "dns:///localhost:7051")
// Gateway peer SSL host name override.
peerHostAlias = utils.EnvOrDefault("PEER_HOST_ALIAS", peerName)
peerHostAlias = envOrDefault("PEER_HOST_ALIAS", peerName)
)
func newGrpcConnection() *grpc.ClientConn {

View file

@ -65,7 +65,7 @@ func (atb *AssetTransferBasic) DeleteAsset(id string) error {
func (atb *AssetTransferBasic) GetAllAssets() ([]byte, error) {
result, err := atb.contract.Evaluate("GetAllAssets")
if err != nil {
return []byte{}, fmt.Errorf("in GetAllAssets: %w", err)
return nil, fmt.Errorf("in GetAllAssets: %w", err)
}
return result, nil
}

View file

@ -5,22 +5,6 @@
*/
package contract
import (
"offChainData/utils"
"github.com/google/uuid"
)
var (
colors = []string{"red", "green", "blue"}
Owners = []string{"alice", "bob", "charlie"}
)
const (
maxInitialValue = 1000
maxInitialSize = 10
)
type Asset struct {
ID string `json:"ID"`
Color string `json:"Color"`
@ -28,18 +12,3 @@ type Asset struct {
Owner string `json:"Owner"`
AppraisedValue uint64 `json:"AppraisedValue"`
}
func NewAsset() Asset {
id, err := uuid.NewRandom()
if err != nil {
panic(err)
}
return Asset{
ID: id.String(),
Color: utils.RandomElement(colors),
Size: uint64(utils.RandomInt(maxInitialSize) + 1),
Owner: utils.RandomElement(Owners),
AppraisedValue: uint64(utils.RandomInt(maxInitialValue) + 1),
}
}

View file

@ -10,7 +10,7 @@ import (
"bytes"
"encoding/json"
"fmt"
atb "offChainData/contract"
atb "offchaindata/contract"
"github.com/hyperledger/fabric-gateway/pkg/client"
"google.golang.org/grpc"

View file

@ -1,4 +1,4 @@
module offChainData
module offchaindata
go 1.22.0

View file

@ -3,13 +3,11 @@ package main
import (
"context"
"fmt"
"offChainData/parser"
"offChainData/processor"
"offChainData/store"
"offChainData/utils"
"offchaindata/parser"
"os"
"os/signal"
"sync"
"slices"
"strings"
"syscall"
"github.com/hyperledger/fabric-gateway/pkg/client"
@ -27,7 +25,7 @@ func listen(clientConnection *grpc.ClientConn) {
fmt.Println("Gateway closed.")
}()
checkpointFile := utils.EnvOrDefault("CHECKPOINT_FILE", "checkpoint.json")
checkpointFile := envOrDefault("CHECKPOINT_FILE", "checkpoint.json")
checkpointer, err := client.NewFileCheckpointer(checkpointFile)
if err != nil {
panic(err)
@ -39,8 +37,8 @@ func listen(clientConnection *grpc.ClientConn) {
fmt.Println("Start event listening from block", checkpointer.BlockNumber())
fmt.Println("Last processed transaction ID within block:", checkpointer.TransactionID())
if store.SimulatedFailureCount > 0 {
fmt.Printf("Simulating a write failure every %d transactions\n", store.SimulatedFailureCount)
if simulatedFailureCount > 0 {
fmt.Printf("Simulating a write failure every %d transactions\n", simulatedFailureCount)
}
ctx, close := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
@ -62,32 +60,251 @@ func listen(clientConnection *grpc.ClientConn) {
panic(err)
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for blockProto := range blocks {
select {
case <-ctx.Done():
return
default:
blockProcessor := processor.NewBlock(
parser.ParseBlock(blockProto),
checkpointer,
store.ApplyWritesToOffChainStore,
channelName,
)
if err := blockProcessor.Process(); err != nil {
fmt.Println("\033[31m[ERROR]\033[0m", err)
return
}
}
for blockProto := range blocks {
aBlockProcessor := blockProcessor{
parser.ParseBlock(blockProto),
checkpointer,
applyWritesToOffChainStore,
channelName,
}
}()
wg.Wait()
if err := aBlockProcessor.process(); err != nil {
fmt.Println("\033[31m[ERROR]\033[0m", err)
return
}
}
fmt.Println("\nShutting down listener gracefully...")
}
type blockProcessor struct {
parsedBlock *parser.Block
checkpointer *client.FileCheckpointer
writeToStore writer
channelName string
}
func (b *blockProcessor) process() error {
funcName := "Process"
fmt.Println("\nReceived block", b.parsedBlock.Number())
validTransactions, err := b.validTransactions()
if err != nil {
return fmt.Errorf("in %s: %w", funcName, err)
}
for _, validTransaction := range validTransactions {
aTransaction := transactionProcessor{
b.parsedBlock.Number(),
validTransaction,
// TODO use pointer to parent and get blockNumber, store and channelName from parent
b.writeToStore,
b.channelName,
}
if err := aTransaction.process(); err != nil {
return fmt.Errorf("in %s: %w", funcName, err)
}
channelHeader, err := validTransaction.ChannelHeader()
if err != nil {
return fmt.Errorf("in %s: %w", funcName, err)
}
transactionID := channelHeader.GetTxId()
if err := b.checkpointer.CheckpointTransaction(b.parsedBlock.Number(), transactionID); err != nil {
return fmt.Errorf("in %s: %w", funcName, err)
}
}
if err := b.checkpointer.CheckpointBlock(b.parsedBlock.Number()); err != nil {
return fmt.Errorf("in %s: %w", funcName, err)
}
return nil
}
func (b *blockProcessor) validTransactions() ([]*parser.Transaction, error) {
result := []*parser.Transaction{}
newTransactions, err := b.getNewTransactions()
if err != nil {
return nil, fmt.Errorf("in validTransactions: %w", err)
}
for _, transaction := range newTransactions {
if transaction.IsValid() {
result = append(result, transaction)
}
}
return result, nil
}
func (b *blockProcessor) getNewTransactions() ([]*parser.Transaction, error) {
funcName := "getNewTransactions"
transactions, err := b.parsedBlock.Transactions()
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
lastTransactionID := b.checkpointer.TransactionID()
if lastTransactionID == "" {
// No previously processed transactions within this block so all are new
return transactions, nil
}
// Ignore transactions up to the last processed transaction ID
lastProcessedIndex, err := b.findLastProcessedIndex()
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
return transactions[lastProcessedIndex+1:], nil
}
func (b *blockProcessor) findLastProcessedIndex() (int, error) {
funcName := "findLastProcessedIndex"
transactions, err := b.parsedBlock.Transactions()
if err != nil {
return 0, fmt.Errorf("in %s: %w", funcName, err)
}
blockTransactionIDs := []string{}
for _, transaction := range transactions {
channelHeader, err := transaction.ChannelHeader()
if err != nil {
return 0, fmt.Errorf("in %s: %w", funcName, err)
}
blockTransactionIDs = append(blockTransactionIDs, channelHeader.GetTxId())
}
lastTransactionID := b.checkpointer.TransactionID()
lastProcessedIndex := -1
for index, id := range blockTransactionIDs {
if id == lastTransactionID {
lastProcessedIndex = index
}
}
if lastProcessedIndex < 0 {
return lastProcessedIndex, newTxIDNotFoundError(
lastTransactionID,
b.parsedBlock.Number(),
blockTransactionIDs,
)
}
return lastProcessedIndex, nil
}
type transactionProcessor struct {
blockNumber uint64
transaction *parser.Transaction
writeToStore writer
channelName string
}
func (t *transactionProcessor) process() error {
funcName := "process"
channelHeader, err := t.transaction.ChannelHeader()
if err != nil {
return fmt.Errorf("in %s: %w", funcName, err)
}
transactionID := channelHeader.GetTxId()
writes, err := t.writes()
if err != nil {
return fmt.Errorf("in %s: %w", funcName, err)
}
if len(writes) == 0 {
fmt.Println("Skipping read-only or system transaction", transactionID)
return nil
}
fmt.Println("Process transaction", transactionID)
if err := t.writeToStore(ledgerUpdate{
BlockNumber: t.blockNumber,
TransactionID: transactionID,
Writes: writes,
}); err != nil {
return fmt.Errorf("in %s: %w", funcName, err)
}
return nil
}
func (t *transactionProcessor) writes() ([]write, error) {
funcName := "writes"
// TODO this entire code should live in the parser and just return the kvWrite which
// we then map to write and return
channelHeader, err := t.transaction.ChannelHeader()
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
t.channelName = channelHeader.GetChannelId()
nsReadWriteSets, err := t.transaction.NamespaceReadWriteSets()
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
nonSystemCCReadWriteSets := []*parser.NamespaceReadWriteSet{}
for _, nsReadWriteSet := range nsReadWriteSets {
if !t.isSystemChaincode(nsReadWriteSet.Namespace()) {
nonSystemCCReadWriteSets = append(nonSystemCCReadWriteSets, nsReadWriteSet)
}
}
writes := []write{}
for _, readWriteSet := range nonSystemCCReadWriteSets {
namespace := readWriteSet.Namespace()
kvReadWriteSet, err := readWriteSet.ReadWriteSet()
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
for _, kvWrite := range kvReadWriteSet.GetWrites() {
writes = append(writes, write{
ChannelName: t.channelName,
Namespace: namespace,
Key: kvWrite.GetKey(),
IsDelete: kvWrite.GetIsDelete(),
Value: string(kvWrite.GetValue()), // Convert bytes to text, purely for readability in output
})
}
}
return writes, nil
}
func (t *transactionProcessor) isSystemChaincode(chaincodeName string) bool {
systemChaincodeNames := []string{
"_lifecycle",
"cscc",
"escc",
"lscc",
"qscc",
"vscc",
}
return slices.Contains(systemChaincodeNames, chaincodeName)
}
type txIDNotFoundError struct {
txID string
blockNumber uint64
blockTxIDs []string
}
func newTxIDNotFoundError(txID string, blockNumber uint64, blockTxIds []string) *txIDNotFoundError {
return &txIDNotFoundError{
txID, blockNumber, blockTxIds,
}
}
func (t *txIDNotFoundError) Error() string {
format := "checkpoint transaction ID %s not found in block %d containing transactions: %s"
return fmt.Sprintf(format, t.txID, t.blockNumber, strings.Join(t.blockTxIDs, ", "))
}

View file

@ -2,49 +2,48 @@ package parser
import (
"fmt"
"offChainData/utils"
"github.com/hyperledger/fabric-protos-go-apiv2/common"
"google.golang.org/protobuf/proto"
)
type Block struct {
block *common.Block
transactions []*Transaction
block *common.Block
cachedTransactions []*Transaction
}
func ParseBlock(block *common.Block) *Block {
return &Block{block, []*Transaction{}}
return &Block{block, nil}
}
func (b *Block) Number() (uint64, error) {
header, err := utils.AssertDefined(b.block.GetHeader(), "missing block header")
if err != nil {
return 0, fmt.Errorf("in Number: %w", err)
}
return header.GetNumber(), nil
func (b *Block) Number() uint64 {
return b.block.GetHeader().GetNumber()
}
func (b *Block) Transactions() ([]*Transaction, error) {
return utils.Cache(func() ([]*Transaction, error) {
funcName := "Transactions"
envelopes, err := b.unmarshalEnvelopesFromBlockData()
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
if b.cachedTransactions != nil {
return b.cachedTransactions, nil
}
commonPayloads, err := b.unmarshalPayloadsFrom(envelopes)
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
funcName := "Transactions"
envelopes, err := b.unmarshalEnvelopesFromBlockData()
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
payloads, err := b.parse(commonPayloads)
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
commonPayloads, err := b.unmarshalPayloadsFrom(envelopes)
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
return b.createTransactionsFrom(payloads), nil
})()
payloads, err := b.parse(commonPayloads)
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
b.cachedTransactions = b.createTransactionsFrom(payloads)
return b.cachedTransactions, nil
}
func (b *Block) unmarshalEnvelopesFromBlockData() ([]*common.Envelope, error) {
@ -74,20 +73,11 @@ func (*Block) unmarshalPayloadsFrom(envelopes []*common.Envelope) ([]*common.Pay
func (b *Block) parse(commonPayloads []*common.Payload) ([]*payload, error) {
funcName := "parse"
validationCodes, err := b.extractTransactionValidationCodes()
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
validationCodes := b.block.GetMetadata().GetMetadata()[common.BlockMetadataIndex_TRANSACTIONS_FILTER]
result := []*payload{}
for i, commonPayload := range commonPayloads {
statusCode, err := utils.AssertDefined(
validationCodes[i],
fmt.Sprint("missing validation code index", i),
)
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
statusCode := validationCodes[i]
payload := parsePayload(commonPayload, int32(statusCode))
is, err := payload.isEndorserTransaction()
@ -102,21 +92,6 @@ func (b *Block) parse(commonPayloads []*common.Payload) ([]*payload, error) {
return result, nil
}
func (b *Block) extractTransactionValidationCodes() ([]byte, error) {
metadata, err := utils.AssertDefined(
b.block.GetMetadata(),
"missing block metadata",
)
if err != nil {
return nil, fmt.Errorf("in extractTransactionValidationCodes: %w", err)
}
return utils.AssertDefined(
metadata.GetMetadata()[common.BlockMetadataIndex_TRANSACTIONS_FILTER],
"missing transaction validation code",
)
}
func (*Block) createTransactionsFrom(payloads []*payload) []*Transaction {
result := []*Transaction{}
for _, payload := range payloads {

View file

@ -4,7 +4,7 @@ import (
"encoding/json"
"testing"
atb "offChainData/contract"
atb "offchaindata/contract"
"github.com/hyperledger/fabric-protos-go-apiv2/ledger/rwset"
"github.com/hyperledger/fabric-protos-go-apiv2/ledger/rwset/kvrwset"
@ -107,7 +107,13 @@ func Test_NamespaceReadWriteSetParsing(t *testing.T) {
func nsReadWriteSetFake() (*rwset.NsReadWriteSet, string, atb.Asset) {
expectedNamespace := "basic"
expectedAsset := atb.NewAsset()
expectedAsset := atb.Asset{
ID: "id-1",
Color: "green",
Size: 8,
Owner: "Alice",
AppraisedValue: 346,
}
result := &rwset.NsReadWriteSet{
Namespace: expectedNamespace,

View file

@ -2,7 +2,6 @@ package parser
import (
"fmt"
"offChainData/utils"
"github.com/hyperledger/fabric-protos-go-apiv2/ledger/rwset"
"github.com/hyperledger/fabric-protos-go-apiv2/peer"
@ -10,43 +9,49 @@ import (
)
type endorserTransaction struct {
transaction *peer.Transaction
transaction *peer.Transaction
cachedReadWriteSets []*readWriteSet
}
func parseEndorserTransaction(transaction *peer.Transaction) *endorserTransaction {
return &endorserTransaction{transaction}
return &endorserTransaction{transaction, nil}
}
func (p *endorserTransaction) readWriteSets() ([]*readWriteSet, error) {
return utils.Cache(func() ([]*readWriteSet, error) {
funcName := "readWriteSets"
chaincodeActionPayloads, err := p.unmarshalChaincodeActionPayloads()
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
funcName := "readWriteSets"
chaincodeEndorsedActions, err := p.extractChaincodeEndorsedActionsFrom(chaincodeActionPayloads)
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
if p.cachedReadWriteSets != nil {
return p.cachedReadWriteSets, nil
}
proposalResponsePayloads, err := p.unmarshalProposalResponsePayloadsFrom(chaincodeEndorsedActions)
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
chaincodeActionPayloads, err := p.unmarshalChaincodeActionPayloads()
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
chaincodeActions, err := p.unmarshalChaincodeActionsFrom(proposalResponsePayloads)
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
chaincodeEndorsedActions, err := p.extractChaincodeEndorsedActionsFrom(chaincodeActionPayloads)
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
txReadWriteSets, err := p.unmarshalTxReadWriteSetsFrom(chaincodeActions)
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
proposalResponsePayloads, err := p.unmarshalProposalResponsePayloadsFrom(chaincodeEndorsedActions)
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
return p.parseReadWriteSets(txReadWriteSets), nil
})()
chaincodeActions, err := p.unmarshalChaincodeActionsFrom(proposalResponsePayloads)
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
txReadWriteSets, err := p.unmarshalTxReadWriteSetsFrom(chaincodeActions)
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
p.cachedReadWriteSets = p.parseReadWriteSets(txReadWriteSets)
return p.cachedReadWriteSets, nil
}
func (p *endorserTransaction) unmarshalChaincodeActionPayloads() ([]*peer.ChaincodeActionPayload, error) {
@ -65,18 +70,7 @@ func (p *endorserTransaction) unmarshalChaincodeActionPayloads() ([]*peer.Chainc
func (*endorserTransaction) extractChaincodeEndorsedActionsFrom(chaincodeActionPayloads []*peer.ChaincodeActionPayload) ([]*peer.ChaincodeEndorsedAction, error) {
result := []*peer.ChaincodeEndorsedAction{}
for _, payload := range chaincodeActionPayloads {
chaincodeEndorsedAction, err := utils.AssertDefined(
payload.GetAction(),
"missing chaincode endorsed action",
)
if err != nil {
return nil, fmt.Errorf("in extractChaincodeEndorsedActionsFrom: %w", err)
}
result = append(
result,
chaincodeEndorsedAction,
)
result = append(result, payload.GetAction())
}
return result, nil
}

View file

@ -2,7 +2,6 @@ package parser
import (
"fmt"
"offChainData/utils"
"github.com/hyperledger/fabric-protos-go-apiv2/ledger/rwset"
"github.com/hyperledger/fabric-protos-go-apiv2/ledger/rwset/kvrwset"
@ -10,11 +9,12 @@ import (
)
type NamespaceReadWriteSet struct {
nsReadWriteSet *rwset.NsReadWriteSet
nsReadWriteSet *rwset.NsReadWriteSet
cachedReadWriteSet *kvrwset.KVRWSet
}
func parseNamespaceReadWriteSet(nsRwSet *rwset.NsReadWriteSet) *NamespaceReadWriteSet {
return &NamespaceReadWriteSet{nsRwSet}
return &NamespaceReadWriteSet{nsRwSet, nil}
}
func (p *NamespaceReadWriteSet) Namespace() string {
@ -22,12 +22,14 @@ func (p *NamespaceReadWriteSet) Namespace() string {
}
func (p *NamespaceReadWriteSet) ReadWriteSet() (*kvrwset.KVRWSet, error) {
return utils.Cache(func() (*kvrwset.KVRWSet, error) {
result := kvrwset.KVRWSet{}
if err := proto.Unmarshal(p.nsReadWriteSet.GetRwset(), &result); err != nil {
return nil, fmt.Errorf("in ReadWriteSet: %w", err)
}
if p.cachedReadWriteSet != nil {
return p.cachedReadWriteSet, nil
}
return &result, nil
})()
p.cachedReadWriteSet = &kvrwset.KVRWSet{}
if err := proto.Unmarshal(p.nsReadWriteSet.GetRwset(), p.cachedReadWriteSet); err != nil {
return nil, fmt.Errorf("in ReadWriteSet: %w", err)
}
return p.cachedReadWriteSet, nil
}

View file

@ -2,7 +2,6 @@ package parser
import (
"fmt"
"offChainData/utils"
"github.com/hyperledger/fabric-protos-go-apiv2/common"
"github.com/hyperledger/fabric-protos-go-apiv2/peer"
@ -10,30 +9,26 @@ import (
)
type payload struct {
commonPayload *common.Payload
statusCode int32
commonPayload *common.Payload
statusCode int32
cachedChannelHeader *common.ChannelHeader
}
func parsePayload(commonPayload *common.Payload, statusCode int32) *payload {
return &payload{commonPayload, statusCode}
return &payload{commonPayload, statusCode, nil}
}
func (p *payload) channelHeader() (*common.ChannelHeader, error) {
return utils.Cache(func() (*common.ChannelHeader, error) {
funcName := "channelHeader"
if p.cachedChannelHeader != nil {
return p.cachedChannelHeader, nil
}
header, err := utils.AssertDefined(p.commonPayload.GetHeader(), "missing payload header")
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
p.cachedChannelHeader = &common.ChannelHeader{}
if err := proto.Unmarshal(p.commonPayload.GetHeader().GetChannelHeader(), p.cachedChannelHeader); err != nil {
return nil, fmt.Errorf("in channelHeader: %w", err)
}
result := &common.ChannelHeader{}
if err := proto.Unmarshal(header.GetChannelHeader(), result); err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
return result, nil
})()
return p.cachedChannelHeader, nil
}
func (p *payload) endorserTransaction() (*endorserTransaction, error) {

View file

@ -1,147 +0,0 @@
package processor
import (
"fmt"
"offChainData/parser"
"offChainData/store"
"github.com/hyperledger/fabric-gateway/pkg/client"
)
type block struct {
parsedBlock *parser.Block
checkpointer *client.FileCheckpointer
writeToStore store.Writer
channelName string
}
func NewBlock(
parsedBlock *parser.Block,
checkpointer *client.FileCheckpointer,
writeToStore store.Writer,
channelName string,
) *block {
return &block{
parsedBlock,
checkpointer,
writeToStore,
channelName,
}
}
func (b *block) Process() error {
funcName := "Process"
blockNumber, err := b.parsedBlock.Number()
if err != nil {
return fmt.Errorf("in %s: %w", funcName, err)
}
fmt.Println("\nReceived block", blockNumber)
validTransactions, err := b.validTransactions()
if err != nil {
return fmt.Errorf("in %s: %w", funcName, err)
}
for _, validTransaction := range validTransactions {
aTransaction := transaction{
blockNumber,
validTransaction,
// TODO use pointer to parent and get blockNumber, store and channelName from parent
b.writeToStore,
b.channelName,
}
if err := aTransaction.process(); err != nil {
return fmt.Errorf("in %s: %w", funcName, err)
}
channelHeader, err := validTransaction.ChannelHeader()
if err != nil {
return fmt.Errorf("in %s: %w", funcName, err)
}
transactionId := channelHeader.GetTxId()
b.checkpointer.CheckpointTransaction(blockNumber, transactionId)
}
b.checkpointer.CheckpointBlock(blockNumber)
return nil
}
func (b *block) validTransactions() ([]*parser.Transaction, error) {
result := []*parser.Transaction{}
newTransactions, err := b.getNewTransactions()
if err != nil {
return nil, fmt.Errorf("in validTransactions: %w", err)
}
for _, transaction := range newTransactions {
if transaction.IsValid() {
result = append(result, transaction)
}
}
return result, nil
}
func (b *block) getNewTransactions() ([]*parser.Transaction, error) {
funcName := "getNewTransactions"
transactions, err := b.parsedBlock.Transactions()
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
lastTransactionId := b.checkpointer.TransactionID()
if lastTransactionId == "" {
// No previously processed transactions within this block so all are new
return transactions, nil
}
// Ignore transactions up to the last processed transaction ID
lastProcessedIndex, err := b.findLastProcessedIndex()
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
return transactions[lastProcessedIndex+1:], nil
}
func (b *block) findLastProcessedIndex() (int, error) {
funcName := "findLastProcessedIndex"
transactions, err := b.parsedBlock.Transactions()
if err != nil {
return 0, fmt.Errorf("in %s: %w", funcName, err)
}
blockTransactionIds := []string{}
for _, transaction := range transactions {
channelHeader, err := transaction.ChannelHeader()
if err != nil {
return 0, fmt.Errorf("in %s: %w", funcName, err)
}
blockTransactionIds = append(blockTransactionIds, channelHeader.GetTxId())
}
lastTransactionId := b.checkpointer.TransactionID()
lastProcessedIndex := -1
for index, id := range blockTransactionIds {
if id == lastTransactionId {
lastProcessedIndex = index
}
}
if lastProcessedIndex < 0 {
blockNumber, err := b.parsedBlock.Number()
if err != nil {
return 0, fmt.Errorf("in %s: %w", funcName, err)
}
return lastProcessedIndex, newTxIdNotFoundError(
lastTransactionId,
blockNumber,
blockTransactionIds,
)
}
return lastProcessedIndex, nil
}

View file

@ -1,104 +0,0 @@
package processor
import (
"fmt"
"offChainData/parser"
"offChainData/store"
"slices"
)
type transaction struct {
blockNumber uint64
transaction *parser.Transaction
writeToStore store.Writer
channelName string
}
func (t *transaction) process() error {
funcName := "process"
channelHeader, err := t.transaction.ChannelHeader()
if err != nil {
return fmt.Errorf("in %s: %w", funcName, err)
}
transactionId := channelHeader.GetTxId()
writes, err := t.writes()
if err != nil {
return fmt.Errorf("in %s: %w", funcName, err)
}
if len(writes) == 0 {
fmt.Println("Skipping read-only or system transaction", transactionId)
return nil
}
fmt.Println("Process transaction", transactionId)
if err := t.writeToStore(store.LedgerUpdate{
BlockNumber: t.blockNumber,
TransactionId: transactionId,
Writes: writes,
}); err != nil {
return fmt.Errorf("in %s: %w", funcName, err)
}
return nil
}
func (t *transaction) writes() ([]store.Write, error) {
funcName := "writes"
// TODO this entire code should live in the parser and just return the kvWrite which
// we then map to store.Write and return
channelHeader, err := t.transaction.ChannelHeader()
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
t.channelName = channelHeader.GetChannelId()
nsReadWriteSets, err := t.transaction.NamespaceReadWriteSets()
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
nonSystemCCReadWriteSets := []*parser.NamespaceReadWriteSet{}
for _, nsReadWriteSet := range nsReadWriteSets {
if !t.isSystemChaincode(nsReadWriteSet.Namespace()) {
nonSystemCCReadWriteSets = append(nonSystemCCReadWriteSets, nsReadWriteSet)
}
}
writes := []store.Write{}
for _, readWriteSet := range nonSystemCCReadWriteSets {
namespace := readWriteSet.Namespace()
kvReadWriteSet, err := readWriteSet.ReadWriteSet()
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
for _, kvWrite := range kvReadWriteSet.GetWrites() {
writes = append(writes, store.Write{
ChannelName: t.channelName,
Namespace: namespace,
Key: kvWrite.GetKey(),
IsDelete: kvWrite.GetIsDelete(),
Value: string(kvWrite.GetValue()), // Convert bytes to text, purely for readability in output
})
}
}
return writes, nil
}
func (t *transaction) isSystemChaincode(chaincodeName string) bool {
systemChaincodeNames := []string{
"_lifecycle",
"cscc",
"escc",
"lscc",
"qscc",
"vscc",
}
return slices.Contains(systemChaincodeNames, chaincodeName)
}

View file

@ -1,32 +0,0 @@
package processor
import "fmt"
type txIdNotFoundError struct {
txId string
blockNumber uint64
blockTxIds []string
}
func newTxIdNotFoundError(txId string, blockNumber uint64, blockTxIds []string) *txIdNotFoundError {
return &txIdNotFoundError{
txId, blockNumber, blockTxIds,
}
}
func (t *txIdNotFoundError) Error() string {
format := "checkpoint transaction ID %s not found in block %d containing transactions: %s"
return fmt.Sprintf(format, t.txId, t.blockNumber, t.blockTxIdsJoinedByComma())
}
func (t *txIdNotFoundError) blockTxIdsJoinedByComma() string {
result := ""
for index, item := range t.blockTxIds {
if len(t.blockTxIds)-1 == index {
result += item
} else {
result += item + ", "
}
}
return result
}

View file

@ -1,24 +1,47 @@
package store
package main
import (
"encoding/json"
"errors"
"fmt"
"math"
"offChainData/utils"
"os"
"strconv"
"strings"
)
var storeFile = utils.EnvOrDefault("STORE_FILE", "store.log")
var SimulatedFailureCount = getSimulatedFailureCount()
var storeFile = envOrDefault("STORE_FILE", "store.log")
var simulatedFailureCount = getSimulatedFailureCount()
var transactionCount uint = 0 // Used only to simulate failures
// Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance.
type writer = func(data ledgerUpdate) error
// Ledger update made by a specific transaction.
type ledgerUpdate struct {
BlockNumber uint64
TransactionID string
Writes []write
}
// Description of a ledger Write that can be applied to an off-chain data store.
type write struct {
// Channel whose ledger is being updated.
ChannelName string `json:"channelName"`
// Namespace within the ledger.
Namespace string `json:"namespace"`
// Key name within the ledger namespace.
Key string `json:"key"`
// Whether the key and associated value are being deleted.
IsDelete bool `json:"isDelete"`
// If `isDelete` is false, the Value written to the key; otherwise ignored.
Value string `json:"value"`
}
// Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance.
// This implementation just writes to a file.
func ApplyWritesToOffChainStore(data LedgerUpdate) error {
funcName := "ApplyWritesToOffChainStore"
func applyWritesToOffChainStore(data ledgerUpdate) error {
funcName := "applyWritesToOffChainStore"
if err := simulateFailureIfRequired(); err != nil {
return fmt.Errorf("in %s: %w", funcName, err)
@ -52,7 +75,7 @@ func ApplyWritesToOffChainStore(data LedgerUpdate) error {
}
func simulateFailureIfRequired() error {
if SimulatedFailureCount > 0 && transactionCount >= SimulatedFailureCount {
if simulatedFailureCount > 0 && transactionCount >= simulatedFailureCount {
transactionCount = 0
return errors.New("expected error: simulated write failure")
}
@ -63,7 +86,7 @@ func simulateFailureIfRequired() error {
}
func getSimulatedFailureCount() uint {
valueAsString := utils.EnvOrDefault("SIMULATED_FAILURE_COUNT", "0")
valueAsString := envOrDefault("SIMULATED_FAILURE_COUNT", "0")
valueAsFloat, err := strconv.ParseFloat(valueAsString, 64)
if err != nil {
panic(err)

View file

@ -1,25 +0,0 @@
package store
// Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance.
type Writer = func(data LedgerUpdate) error
// Ledger update made by a specific transaction.
type LedgerUpdate struct {
BlockNumber uint64
TransactionId string
Writes []Write
}
// Description of a ledger Write that can be applied to an off-chain data store.
type Write struct {
// Channel whose ledger is being updated.
ChannelName string `json:"channelName"`
// Namespace within the ledger.
Namespace string `json:"namespace"`
// Key name within the ledger namespace.
Key string `json:"key"`
// Whether the key and associated value are being deleted.
IsDelete bool `json:"isDelete"`
// If `isDelete` is false, the Value written to the key; otherwise ignored.
Value string `json:"value"`
}

View file

@ -1,16 +1,20 @@
package main
import (
"crypto/rand"
"fmt"
"math/big"
"sync"
atb "offChainData/contract"
"offChainData/utils"
atb "offchaindata/contract"
"github.com/google/uuid"
"github.com/hyperledger/fabric-gateway/pkg/client"
"google.golang.org/grpc"
)
var owners = []string{"alice", "bob", "charlie"}
func transact(clientConnection *grpc.ClientConn) {
id, options := newConnectOptions(clientConnection)
gateway, err := client.Connect(id, options...)
@ -59,7 +63,7 @@ func (t *transactApp) run() {
func (t *transactApp) transact() error {
funcName := "transact"
anAsset := atb.NewAsset()
anAsset := newAsset()
err := t.smartContract.CreateAsset(anAsset)
if err != nil {
@ -68,8 +72,8 @@ func (t *transactApp) transact() error {
fmt.Println("Created asset", anAsset.ID)
// Transfer randomly 1 in 2 assets to a new owner.
if utils.RandomInt(2) == 0 {
newOwner := utils.DifferentElement(atb.Owners, anAsset.Owner)
if randomInt(2) == 0 {
newOwner := differentElement(owners, anAsset.Owner)
oldOwner, err := t.smartContract.TransferAsset(anAsset.ID, newOwner)
if err != nil {
return fmt.Errorf("in %s: %w", funcName, err)
@ -78,7 +82,7 @@ func (t *transactApp) transact() error {
}
// Delete randomly 1 in 4 created assets.
if utils.RandomInt(4) == 0 {
if randomInt(4) == 0 {
err := t.smartContract.DeleteAsset(anAsset.ID)
if err != nil {
return fmt.Errorf("in %s: %w", funcName, err)
@ -87,3 +91,45 @@ func (t *transactApp) transact() error {
}
return nil
}
func newAsset() atb.Asset {
id, err := uuid.NewRandom()
if err != nil {
panic(err)
}
return atb.Asset{
ID: id.String(),
Color: randomElement([]string{"red", "green", "blue"}),
Size: uint64(randomInt(10) + 1),
Owner: randomElement(owners),
AppraisedValue: uint64(randomInt(1000) + 1),
}
}
// Pick a random element from an array.
func randomElement(values []string) string {
result := values[randomInt(len(values))]
return result
}
// Generate a random integer in the range 0 to max - 1.
func randomInt(max int) int {
result, err := rand.Int(rand.Reader, big.NewInt(int64(max)))
if err != nil {
panic(err)
}
return int(result.Int64())
}
// Pick a random element from an array, excluding the current value.
func differentElement(values []string, currentValue string) string {
candidateValues := []string{}
for _, v := range values {
if v != currentValue {
candidateValues = append(candidateValues, v)
}
}
return randomElement(candidateValues)
}

View file

@ -0,0 +1,13 @@
package main
import (
"os"
)
func envOrDefault(key, defaultValue string) string {
result := os.Getenv(key)
if result == "" {
return defaultValue
}
return result
}

View file

@ -1,74 +0,0 @@
package utils
import (
"crypto/rand"
"errors"
"fmt"
"math/big"
"os"
)
// Pick a random element from an array.
func RandomElement(values []string) string {
result := values[RandomInt(len(values))]
return result
}
// Generate a random integer in the range 0 to max - 1.
func RandomInt(max int) int {
result, err := rand.Int(rand.Reader, big.NewInt(int64(max)))
if err != nil {
panic(err)
}
return int(result.Int64())
}
// Pick a random element from an array, excluding the current value.
func DifferentElement(values []string, currentValue string) string {
candidateValues := []string{}
for _, v := range values {
if v != currentValue {
candidateValues = append(candidateValues, v)
}
}
return RandomElement(candidateValues)
}
// Return the value if it is defined; otherwise panics with an error message.
func AssertDefined[T any](value T, message string) (T, error) {
if any(value) == any(nil) {
var zeroValue T
return zeroValue, errors.New(message)
}
return value, nil
}
// Wrap a function call with a cache. On first call the wrapped function is invoked to
// obtain a result. Subsequent calls return the cached result.
func Cache[T any](f func() (T, error)) func() (T, error) {
var value T
var err error
var cached bool
return func() (T, error) {
if !cached {
value, err = f()
if err != nil {
var zeroValue T
return zeroValue, fmt.Errorf("in Cache: %w", err)
}
cached = true
}
return value, nil
}
}
func EnvOrDefault(key, defaultValue string) string {
result := os.Getenv(key)
if result == "" {
return defaultValue
}
return result
}

View file

@ -1,100 +0,0 @@
package utils_test
import (
"errors"
"offChainData/utils"
"testing"
)
func Test_cachePrimitiveFunctionResult(t *testing.T) {
counter := 0
f := func() (int, error) {
counter++
return 5, nil
}
cachedFunc := utils.Cache(f)
result1, err := cachedFunc()
if err != nil {
t.Fatal("unexpected error:", err)
}
result2, err := cachedFunc()
if err != nil {
t.Fatal("unexpected error:", err)
}
if counter != 1 {
t.Error("expected counter to be 1, but got", counter)
}
if result1 != 5 || result2 != 5 {
t.Fatal("expected results to be 5, but got", result1, result2)
}
}
func Test_whenCachedFunctionsErrors_returnError(t *testing.T) {
errorMsg := "error"
f := func() (int, error) {
return 0, errors.New(errorMsg)
}
cachedFunc := utils.Cache(f)
_, err := cachedFunc()
if err == nil {
t.Fatal("expected error, but got", err)
}
if err.Error() != errorMsg {
t.Fatal("expected error message to be 'error', but got", err)
}
}
func Test_cacheWrappedPrimitiveFunctionResult(t *testing.T) {
controlValue := 5
multiplyControlValueBy := func(n int) (int, error) { controlValue *= n; return controlValue, nil }
cachedFunc := utils.Cache(func() (int, error) { return multiplyControlValueBy(5) })
result1, err := cachedFunc()
if err != nil {
t.Fatal("unexpected error:", err)
}
result2, err := cachedFunc()
if err != nil {
t.Fatal("unexpected error:", err)
}
if controlValue != 25 {
t.Error("expected control value to be 25, but got", controlValue)
}
if result1 != 25 || result2 != 25 {
t.Fatal("expected cached results to be 25, but got", result1, result2)
}
}
func Test_cacheWrappedDataStructureResult(t *testing.T) {
type GreetMe struct {
helloTo string
}
controlStruct := &GreetMe{helloTo: "Hello "}
greet := func(name string) (*GreetMe, error) { controlStruct.helloTo += name; return controlStruct, nil }
cachedFunc := utils.Cache(func() (*GreetMe, error) { return greet("John Doe") })
result1, err := cachedFunc()
if err != nil {
t.Fatal("unexpected error:", err)
}
result2, err := cachedFunc()
if err != nil {
t.Fatal("unexpected error:", err)
}
if controlStruct.helloTo != "Hello John Doe" {
t.Error("expected control value to be 'Hello John Doe', but got", controlStruct)
}
if result1.helloTo != "Hello John Doe" || result2.helloTo != "Hello John Doe" {
t.Fatal("expected cached results to be 'Hello John Doe', but got", result1.helloTo, result2.helloTo)
}
}