Use Executor in asset-transfer-events/application-gateway-java

The default ForkJoinPool.commonPool may have limited capacity in some environments, risking deadlock. This implementation also better demonstrates handling of connection errors.

Signed-off-by: Mark S. Lewis <Mark.S.Lewis@outlook.com>
This commit is contained in:
Mark S. Lewis 2023-12-02 21:34:12 +00:00 committed by Dave Enyeart
parent c0a0104ca1
commit ce6e519fab

View file

@ -7,6 +7,7 @@
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonParser;
import io.grpc.Status;
import org.hyperledger.fabric.client.ChaincodeEvent;
import org.hyperledger.fabric.client.CloseableIterator;
import org.hyperledger.fabric.client.CommitException;
@ -14,15 +15,17 @@ import org.hyperledger.fabric.client.CommitStatusException;
import org.hyperledger.fabric.client.Contract;
import org.hyperledger.fabric.client.EndorseException;
import org.hyperledger.fabric.client.Gateway;
import org.hyperledger.fabric.client.GatewayRuntimeException;
import org.hyperledger.fabric.client.Network;
import org.hyperledger.fabric.client.SubmitException;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public final class App {
public final class App implements AutoCloseable {
private static final String channelName = "mychannel";
private static final String chaincodeName = "events";
@ -30,6 +33,7 @@ public final class App {
private final Contract contract;
private final String assetId = "asset" + Instant.now().toEpochMilli();
private final Gson gson = new GsonBuilder().setPrettyPrinting().create();
private final ExecutorService executor = Executors.newCachedThreadPool();
public static void main(final String[] args) throws Exception {
var grpcChannel = Connections.newGrpcConnection();
@ -42,8 +46,8 @@ public final class App {
.submitOptions(options -> options.withDeadlineAfter(5, TimeUnit.SECONDS))
.commitStatusOptions(options -> options.withDeadlineAfter(1, TimeUnit.MINUTES));
try (var gateway = builder.connect()) {
new App(gateway).run();
try (var gateway = builder.connect(); var app = new App(gateway)) {
app.run();
} finally {
grpcChannel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
@ -71,15 +75,22 @@ public final class App {
System.out.println("\n*** Start chaincode event listening");
var eventIter = network.getChaincodeEvents(chaincodeName);
executor.execute(() -> readEvents(eventIter));
CompletableFuture.runAsync(() -> {
return eventIter;
}
private void readEvents(final CloseableIterator<ChaincodeEvent> eventIter) {
try {
eventIter.forEachRemaining(event -> {
var payload = prettyJson(event.getPayload());
System.out.println("\n<-- Chaincode event received: " + event.getEventName() + " - " + payload);
});
});
return eventIter;
} catch (GatewayRuntimeException e) {
if (e.getStatus().getCode() != Status.Code.CANCELLED) {
throw e;
}
}
}
private String prettyJson(final byte[] json) {
@ -154,4 +165,9 @@ public final class App {
}
}
}
@Override
public void close() throws Exception {
executor.shutdownNow();
}
}