Move channel header unmarshal to constructor

Remove txError struct.

Signed-off-by: Stanislav Jakuschevskij <stas@two-giants.com>
This commit is contained in:
Stanislav Jakuschevskij 2025-02-11 17:55:14 +01:00
parent f6858cc7e1
commit 034e9b08c0
No known key found for this signature in database
GPG key ID: 78195D2E6998E2EB
8 changed files with 33 additions and 79 deletions

View file

@ -64,7 +64,7 @@ func (atb *AssetTransferBasic) GetAllAssets() ([]Asset, error) {
} }
if len(assetsRaw) == 0 { if len(assetsRaw) == 0 {
return []Asset{}, nil return nil, nil
} }
var assets []Asset var assets []Asset

View file

@ -24,12 +24,11 @@ func getAllAssets(clientConnection grpc.ClientConnInterface) error {
return err return err
} }
assetsJSONformatted, err := json.MarshalIndent(assets, "", " ") formatted, err := json.MarshalIndent(assets, "", " ")
if err != nil { if err != nil {
return err return err
} }
fmt.Println(string(formatted))
fmt.Println(assetsJSONformatted)
return nil return nil
} }

View file

@ -104,11 +104,7 @@ func (b *blockProcessor) process() error {
return err return err
} }
channelHeader, err := validTransaction.ChannelHeader() transactionID := validTransaction.ChannelHeader().GetTxId()
if err != nil {
return err
}
transactionID := channelHeader.GetTxId()
if err := b.checkpointer.CheckpointTransaction(b.parsedBlock.Number(), transactionID); err != nil { if err := b.checkpointer.CheckpointTransaction(b.parsedBlock.Number(), transactionID); err != nil {
return err return err
} }
@ -164,11 +160,7 @@ func (b *blockProcessor) findLastProcessedIndex() (int, error) {
blockTransactionIDs := []string{} blockTransactionIDs := []string{}
for _, transaction := range transactions { for _, transaction := range transactions {
channelHeader, err := transaction.ChannelHeader() blockTransactionIDs = append(blockTransactionIDs, transaction.ChannelHeader().GetTxId())
if err != nil {
return 0, err
}
blockTransactionIDs = append(blockTransactionIDs, channelHeader.GetTxId())
} }
lastTransactionID := b.checkpointer.TransactionID() lastTransactionID := b.checkpointer.TransactionID()
@ -180,11 +172,13 @@ func (b *blockProcessor) findLastProcessedIndex() (int, error) {
} }
if lastProcessedIndex < 0 { if lastProcessedIndex < 0 {
return lastProcessedIndex, newTxIDNotFoundError( err = fmt.Errorf(
"checkpoint transaction ID %s not found in block %d containing transactions: %s",
lastTransactionID, lastTransactionID,
b.parsedBlock.Number(), b.parsedBlock.Number(),
blockTransactionIDs, strings.Join(blockTransactionIDs, ", "),
) )
return lastProcessedIndex, err
} }
return lastProcessedIndex, nil return lastProcessedIndex, nil
@ -198,11 +192,7 @@ type transactionProcessor struct {
} }
func (t *transactionProcessor) process() error { func (t *transactionProcessor) process() error {
channelHeader, err := t.transaction.ChannelHeader() transactionID := t.transaction.ChannelHeader().GetTxId()
if err != nil {
return err
}
transactionID := channelHeader.GetTxId()
writes, err := t.writes() writes, err := t.writes()
if err != nil { if err != nil {
@ -230,11 +220,7 @@ func (t *transactionProcessor) process() error {
func (t *transactionProcessor) writes() ([]write, error) { func (t *transactionProcessor) writes() ([]write, error) {
// TODO this entire code should live in the parser and just return the kvWrite which // TODO this entire code should live in the parser and just return the kvWrite which
// we then map to write and return // we then map to write and return
channelHeader, err := t.transaction.ChannelHeader() t.channelName = t.transaction.ChannelHeader().GetChannelId()
if err != nil {
return nil, err
}
t.channelName = channelHeader.GetChannelId()
nsReadWriteSets, err := t.transaction.NamespaceReadWriteSets() nsReadWriteSets, err := t.transaction.NamespaceReadWriteSets()
if err != nil { if err != nil {
@ -282,20 +268,3 @@ func (t *transactionProcessor) isSystemChaincode(chaincodeName string) bool {
} }
return slices.Contains(systemChaincodeNames, chaincodeName) return slices.Contains(systemChaincodeNames, chaincodeName)
} }
type txIDNotFoundError struct {
txID string
blockNumber uint64
blockTxIDs []string
}
func newTxIDNotFoundError(txID string, blockNumber uint64, blockTxIds []string) *txIDNotFoundError {
return &txIDNotFoundError{
txID, blockNumber, blockTxIds,
}
}
func (t *txIDNotFoundError) Error() string {
format := "checkpoint transaction ID %s not found in block %d containing transactions: %s"
return fmt.Sprintf(format, t.txID, t.blockNumber, strings.Join(t.blockTxIDs, ", "))
}

View file

@ -76,12 +76,12 @@ func (b *Block) parse(commonPayloads []*common.Payload) ([]*payload, error) {
for i, commonPayload := range commonPayloads { for i, commonPayload := range commonPayloads {
statusCode := validationCodes[i] statusCode := validationCodes[i]
payload := parsePayload(commonPayload, int32(statusCode)) payload, err := parsePayload(commonPayload, int32(statusCode))
is, err := payload.isEndorserTransaction()
if err != nil { if err != nil {
return nil, err return nil, err
} }
if is {
if payload.isEndorserTransaction() {
result = append(result, payload) result = append(result, payload)
} }
} }

View file

@ -88,7 +88,7 @@ func Test_ReadWriteSetWrapping(t *testing.T) {
parsedRwSet := parseReadWriteSet(txReadWriteSetFake) parsedRwSet := parseReadWriteSet(txReadWriteSetFake)
if len(parsedRwSet.namespaceReadWriteSets()) != 1 { if len(parsedRwSet.namespaceReadWriteSets()) != 1 {
t.Fatalf("Expected 1 NamespaceReadWriteSet, got %d", len(parsedRwSet.namespaceReadWriteSets())) t.Fatal("expected 1 NamespaceReadWriteSet, got", len(parsedRwSet.namespaceReadWriteSets()))
} }
} }

View file

@ -2,7 +2,6 @@ package parser
import ( import (
"fmt" "fmt"
"sync"
"github.com/hyperledger/fabric-protos-go-apiv2/common" "github.com/hyperledger/fabric-protos-go-apiv2/common"
"github.com/hyperledger/fabric-protos-go-apiv2/peer" "github.com/hyperledger/fabric-protos-go-apiv2/peer"
@ -12,35 +11,28 @@ import (
type payload struct { type payload struct {
commonPayload *common.Payload commonPayload *common.Payload
statusCode int32 statusCode int32
channelHeader func() (*common.ChannelHeader, error) channelHeader *common.ChannelHeader
} }
func parsePayload(commonPayload *common.Payload, statusCode int32) *payload { func parsePayload(commonPayload *common.Payload, statusCode int32) (*payload, error) {
result := &payload{commonPayload, statusCode, nil} channelHeader, err := unmarshalChannelHeaderFrom(commonPayload)
result.channelHeader = sync.OnceValues(result.unmarshalChannelHeader) if err != nil {
return result
}
func (p *payload) unmarshalChannelHeader() (*common.ChannelHeader, error) {
result := &common.ChannelHeader{}
if err := proto.Unmarshal(p.commonPayload.GetHeader().GetChannelHeader(), result); err != nil {
return nil, err return nil, err
} }
return &payload{commonPayload, statusCode, channelHeader}, nil
}
func unmarshalChannelHeaderFrom(commonPayload *common.Payload) (*common.ChannelHeader, error) {
result := &common.ChannelHeader{}
if err := proto.Unmarshal(commonPayload.GetHeader().GetChannelHeader(), result); err != nil {
return nil, err
}
return result, nil return result, nil
} }
func (p *payload) endorserTransaction() (*endorserTransaction, error) { func (p *payload) endorserTransaction() (*endorserTransaction, error) {
is, err := p.isEndorserTransaction() if !p.isEndorserTransaction() {
if err != nil { return nil, fmt.Errorf("unexpected payload type: %d", p.channelHeader.GetType())
return nil, err
}
if !is {
channelHeader, err := p.channelHeader()
if err != nil {
return nil, err
}
return nil, fmt.Errorf("unexpected payload type: %d", channelHeader.GetType())
} }
result := &peer.Transaction{} result := &peer.Transaction{}
@ -51,13 +43,8 @@ func (p *payload) endorserTransaction() (*endorserTransaction, error) {
return parseEndorserTransaction(result), nil return parseEndorserTransaction(result), nil
} }
func (p *payload) isEndorserTransaction() (bool, error) { func (p *payload) isEndorserTransaction() bool {
channelHeader, err := p.channelHeader() return p.channelHeader.GetType() == int32(common.HeaderType_ENDORSER_TRANSACTION)
if err != nil {
return false, err
}
return channelHeader.GetType() == int32(common.HeaderType_ENDORSER_TRANSACTION), nil
} }
func (p *payload) isValid() bool { func (p *payload) isValid() bool {

View file

@ -12,8 +12,8 @@ func newTransaction(payload *payload) *Transaction {
return &Transaction{payload} return &Transaction{payload}
} }
func (t *Transaction) ChannelHeader() (*common.ChannelHeader, error) { func (t *Transaction) ChannelHeader() *common.ChannelHeader {
return t.payload.channelHeader() return t.payload.channelHeader
} }
func (t *Transaction) NamespaceReadWriteSets() ([]*NamespaceReadWriteSet, error) { func (t *Transaction) NamespaceReadWriteSets() ([]*NamespaceReadWriteSet, error) {

View file

@ -127,8 +127,7 @@ func newAsset() (atb.Asset, error) {
// Pick a random element from an array. // Pick a random element from an array.
func randomElement(values []string) string { func randomElement(values []string) string {
result := values[rand.N(len(values))] return values[rand.N(len(values))]
return result
} }
// Pick a random element from an array, excluding the current value. // Pick a random element from an array, excluding the current value.