mirror of
https://github.com/hyperledger/fabric-samples.git
synced 2026-06-21 17:15:10 +00:00
Implement store
Persisting ledger writes to the file system into the store.log file in the application-go directory. The write values are converted from bytes to a string when the read write sets are unwrapped in the transaction processor. Signed-off-by: Stanislav Jakuschevskij <stas@two-giants.com>
This commit is contained in:
parent
bd0c03356b
commit
095bf304b4
2 changed files with 61 additions and 15 deletions
|
|
@ -2,18 +2,28 @@ package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
"offChainData/parser"
|
"offChainData/parser"
|
||||||
|
"os"
|
||||||
"slices"
|
"slices"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
|
||||||
"github.com/hyperledger/fabric-gateway/pkg/client"
|
"github.com/hyperledger/fabric-gateway/pkg/client"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
)
|
)
|
||||||
|
|
||||||
var checkpointFile = envOrDefault("CHECKPOINT_FILE", "checkpoint.json")
|
var checkpointFile = envOrDefault("CHECKPOINT_FILE", "checkpoint.json")
|
||||||
|
var storeFile = envOrDefault("STORE_FILE", "store.log")
|
||||||
var simulatedFailureCount = getSimulatedFailureCount()
|
var simulatedFailureCount = getSimulatedFailureCount()
|
||||||
|
|
||||||
|
const startBlock = 0
|
||||||
|
|
||||||
|
var transactionCount uint = 0 // Used only to simulate failures
|
||||||
|
|
||||||
// Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance.
|
// Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance.
|
||||||
type store = func(data ledgerUpdate)
|
type store = func(data ledgerUpdate)
|
||||||
|
|
||||||
|
|
@ -27,20 +37,59 @@ type ledgerUpdate struct {
|
||||||
// Description of a ledger write that can be applied to an off-chain data store.
|
// Description of a ledger write that can be applied to an off-chain data store.
|
||||||
type write struct {
|
type write struct {
|
||||||
// Channel whose ledger is being updated.
|
// Channel whose ledger is being updated.
|
||||||
channelName string
|
ChannelName string `json:"channelName"`
|
||||||
// Namespace within the ledger.
|
// Namespace within the ledger.
|
||||||
namespace string
|
Namespace string `json:"namespace"`
|
||||||
// Key name within the ledger namespace.
|
// Key name within the ledger namespace.
|
||||||
key string
|
Key string `json:"key"`
|
||||||
// Whether the key and associated value are being deleted.
|
// Whether the key and associated value are being deleted.
|
||||||
isDelete bool
|
IsDelete bool `json:"isDelete"`
|
||||||
// If `isDelete` is false, the value written to the key; otherwise ignored.
|
// If `isDelete` is false, the Value written to the key; otherwise ignored.
|
||||||
value []byte
|
Value string `json:"value"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance.
|
// Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance.
|
||||||
// This implementation just writes to a file.
|
// This implementation just writes to a file.
|
||||||
var applyWritesToOffChainStore = func(data ledgerUpdate) {
|
var applyWritesToOffChainStore = func(data ledgerUpdate) {
|
||||||
|
if err := simulateFailureIfRequired(); err != nil {
|
||||||
|
fmt.Println("[expected error]: " + err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
writes := []string{}
|
||||||
|
for _, write := range data.writes {
|
||||||
|
marshaled, err := json.Marshal(write)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
writes = append(writes, string(marshaled))
|
||||||
|
}
|
||||||
|
|
||||||
|
f, err := os.OpenFile(storeFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := f.Write([]byte(strings.Join(writes, "\n") + "\n")); err != nil {
|
||||||
|
f.Close()
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := f.Close(); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func simulateFailureIfRequired() error {
|
||||||
|
if simulatedFailureCount > 0 && transactionCount >= simulatedFailureCount {
|
||||||
|
transactionCount = 0
|
||||||
|
return errors.New("simulated write failure")
|
||||||
|
}
|
||||||
|
|
||||||
|
transactionCount += 1
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func listen(clientConnection *grpc.ClientConn) {
|
func listen(clientConnection *grpc.ClientConn) {
|
||||||
|
|
@ -62,7 +111,7 @@ func listen(clientConnection *grpc.ClientConn) {
|
||||||
fmt.Printf("Start event listening from block %d\n", checkpointer.BlockNumber())
|
fmt.Printf("Start event listening from block %d\n", checkpointer.BlockNumber())
|
||||||
fmt.Printf("Last processed transaction ID within block: %s\n", checkpointer.TransactionID())
|
fmt.Printf("Last processed transaction ID within block: %s\n", checkpointer.TransactionID())
|
||||||
if simulatedFailureCount > 0 {
|
if simulatedFailureCount > 0 {
|
||||||
fmt.Printf("Simulating a write failure every %d transactions", simulatedFailureCount)
|
fmt.Printf("Simulating a write failure every %d transactions\n", simulatedFailureCount)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO put into infinite loop like in public docs example
|
// TODO put into infinite loop like in public docs example
|
||||||
|
|
@ -71,8 +120,8 @@ func listen(clientConnection *grpc.ClientConn) {
|
||||||
|
|
||||||
blocks, err := network.BlockEvents(
|
blocks, err := network.BlockEvents(
|
||||||
ctx,
|
ctx,
|
||||||
client.WithStartBlock(0),
|
|
||||||
client.WithCheckpoint(checkpointer),
|
client.WithCheckpoint(checkpointer),
|
||||||
|
client.WithStartBlock(startBlock), // Used only if there is no checkpoint block number
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
|
|
@ -200,8 +249,6 @@ type transactionProcessor struct {
|
||||||
func (t *transactionProcessor) process() {
|
func (t *transactionProcessor) process() {
|
||||||
transactionId := t.transaction.ChannelHeader().GetTxId()
|
transactionId := t.transaction.ChannelHeader().GetTxId()
|
||||||
|
|
||||||
fmt.Println("Process transaction", transactionId)
|
|
||||||
|
|
||||||
writes := t.writes()
|
writes := t.writes()
|
||||||
if len(writes) == 0 {
|
if len(writes) == 0 {
|
||||||
fmt.Println("Skipping read-only or system transaction", transactionId)
|
fmt.Println("Skipping read-only or system transaction", transactionId)
|
||||||
|
|
@ -232,14 +279,13 @@ func (t *transactionProcessor) writes() []write {
|
||||||
namespace := readWriteSet.Namespace()
|
namespace := readWriteSet.Namespace()
|
||||||
|
|
||||||
for _, kvWrite := range readWriteSet.ReadWriteSet().GetWrites() {
|
for _, kvWrite := range readWriteSet.ReadWriteSet().GetWrites() {
|
||||||
aWrite := write{
|
writes = append(writes, write{
|
||||||
channelName,
|
channelName,
|
||||||
namespace,
|
namespace,
|
||||||
kvWrite.GetKey(),
|
kvWrite.GetKey(),
|
||||||
kvWrite.GetIsDelete(),
|
kvWrite.GetIsDelete(),
|
||||||
kvWrite.GetValue(),
|
string(kvWrite.GetValue()), // Convert bytes to text, purely for readability in output
|
||||||
}
|
})
|
||||||
writes = append(writes, aWrite)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -36,7 +36,7 @@ func newTransactApp(smartContract *atb.AssetTransferBasic) *transactApp {
|
||||||
|
|
||||||
func (t *transactApp) run() {
|
func (t *transactApp) run() {
|
||||||
for i := 0; i < int(t.batchSize); i++ {
|
for i := 0; i < int(t.batchSize); i++ {
|
||||||
go t.transact()
|
t.transact()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue