Implement graceful shutdown of listen function

Before when pressing 'ctrl+c' and stopping the go program non of the
deferred functions in listen.go were called. A standard procedure for
stopping goroutines with context was implemented which shuts down the
program gracefully. Logs were added to notify the user that the shutdown
was successful.

Signed-off-by: Stanislav Jakuschevskij <stas@two-giants.com>
This commit is contained in:
Stanislav Jakuschevskij 2025-01-01 17:57:09 +01:00
parent b04bda5b11
commit 624f65da12
No known key found for this signature in database
GPG key ID: 78195D2E6998E2EB

View file

@ -7,6 +7,10 @@ import (
"offChainData/processor" "offChainData/processor"
"offChainData/store" "offChainData/store"
"offChainData/utils" "offChainData/utils"
"os"
"os/signal"
"sync"
"syscall"
"github.com/hyperledger/fabric-gateway/pkg/client" "github.com/hyperledger/fabric-gateway/pkg/client"
"google.golang.org/grpc" "google.golang.org/grpc"
@ -18,14 +22,20 @@ func listen(clientConnection *grpc.ClientConn) {
if err != nil { if err != nil {
panic(err) panic(err)
} }
defer gateway.Close() defer func() {
gateway.Close()
fmt.Println("Gateway closed.")
}()
checkpointFile := utils.EnvOrDefault("CHECKPOINT_FILE", "checkpoint.json") checkpointFile := utils.EnvOrDefault("CHECKPOINT_FILE", "checkpoint.json")
checkpointer, err := client.NewFileCheckpointer(checkpointFile) checkpointer, err := client.NewFileCheckpointer(checkpointFile)
if err != nil { if err != nil {
panic(err) panic(err)
} }
defer checkpointer.Close() defer func() {
checkpointer.Close()
fmt.Println("Checkpointer closed.")
}()
fmt.Println("Start event listening from block", checkpointer.BlockNumber()) fmt.Println("Start event listening from block", checkpointer.BlockNumber())
fmt.Println("Last processed transaction ID within block:", checkpointer.TransactionID()) fmt.Println("Last processed transaction ID within block:", checkpointer.TransactionID())
@ -33,9 +43,11 @@ func listen(clientConnection *grpc.ClientConn) {
fmt.Printf("Simulating a write failure every %d transactions\n", store.SimulatedFailureCount) fmt.Printf("Simulating a write failure every %d transactions\n", store.SimulatedFailureCount)
} }
// TODO put into infinite loop like in public docs example ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
ctx, cancel := context.WithCancel(context.Background()) defer func() {
defer cancel() stop()
fmt.Println("Context closed.")
}()
network := gateway.GetNetwork(channelName) network := gateway.GetNetwork(channelName)
blocks, err := network.BlockEvents( blocks, err := network.BlockEvents(
@ -47,7 +59,17 @@ func listen(clientConnection *grpc.ClientConn) {
panic(err) panic(err)
} }
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for blockProto := range blocks { for blockProto := range blocks {
select {
case <-ctx.Done():
return
default:
blockProcessor := processor.NewBlock( blockProcessor := processor.NewBlock(
parser.ParseBlock(blockProto), parser.ParseBlock(blockProto),
checkpointer, checkpointer,
@ -57,3 +79,8 @@ func listen(clientConnection *grpc.ClientConn) {
blockProcessor.Process() blockProcessor.Process()
} }
} }
}()
wg.Wait()
fmt.Println("\nReceived 'SIGTERM' signal. Shutting down listener gracefully...")
}