diff --git a/off_chain_data/application-go/listen.go b/off_chain_data/application-go/listen.go index 4a974e78..d6200dba 100644 --- a/off_chain_data/application-go/listen.go +++ b/off_chain_data/application-go/listen.go @@ -7,6 +7,10 @@ import ( "offChainData/processor" "offChainData/store" "offChainData/utils" + "os" + "os/signal" + "sync" + "syscall" "github.com/hyperledger/fabric-gateway/pkg/client" "google.golang.org/grpc" @@ -18,14 +22,20 @@ func listen(clientConnection *grpc.ClientConn) { if err != nil { panic(err) } - defer gateway.Close() + defer func() { + gateway.Close() + fmt.Println("Gateway closed.") + }() checkpointFile := utils.EnvOrDefault("CHECKPOINT_FILE", "checkpoint.json") checkpointer, err := client.NewFileCheckpointer(checkpointFile) if err != nil { panic(err) } - defer checkpointer.Close() + defer func() { + checkpointer.Close() + fmt.Println("Checkpointer closed.") + }() fmt.Println("Start event listening from block", checkpointer.BlockNumber()) fmt.Println("Last processed transaction ID within block:", checkpointer.TransactionID()) @@ -33,9 +43,11 @@ func listen(clientConnection *grpc.ClientConn) { fmt.Printf("Simulating a write failure every %d transactions\n", store.SimulatedFailureCount) } - // TODO put into infinite loop like in public docs example - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer func() { + stop() + fmt.Println("Context closed.") + }() network := gateway.GetNetwork(channelName) blocks, err := network.BlockEvents( @@ -47,13 +59,28 @@ func listen(clientConnection *grpc.ClientConn) { panic(err) } - for blockProto := range blocks { - blockProcessor := processor.NewBlock( - parser.ParseBlock(blockProto), - checkpointer, - store.ApplyWritesToOffChainStore, - channelName, - ) - blockProcessor.Process() - } + var wg sync.WaitGroup + wg.Add(1) + + go func() { + defer wg.Done() + + for blockProto := range blocks { + select { + case <-ctx.Done(): + return + default: + blockProcessor := processor.NewBlock( + parser.ParseBlock(blockProto), + checkpointer, + store.ApplyWritesToOffChainStore, + channelName, + ) + blockProcessor.Process() + } + } + }() + + wg.Wait() + fmt.Println("\nReceived 'SIGTERM' signal. Shutting down listener gracefully...") }