From 8769f0aa871f6324567be2f1dff0453e78c24b31 Mon Sep 17 00:00:00 2001 From: Aaron Ai Date: Mon, 6 Nov 2023 09:54:51 +0800 Subject: [PATCH] Remove semaphore from AsyncSimpleConsumerExample Signed-off-by: Aaron Ai --- .../client/java/example/AsyncSimpleConsumerExample.java | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java index 0116da2e4..63b0fba71 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java @@ -24,7 +24,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Semaphore; import java.util.stream.Collectors; import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientException; @@ -46,7 +45,7 @@ private AsyncSimpleConsumerExample() { } @SuppressWarnings({"resource", "InfiniteLoopStatement"}) - public static void main(String[] args) throws ClientException, InterruptedException { + public static void main(String[] args) throws ClientException { final ClientServiceProvider provider = ClientServiceProvider.loadService(); // Credential provider is optional for client configuration. @@ -78,9 +77,6 @@ public static void main(String[] args) throws ClientException, InterruptedExcept // Set the subscription for the consumer. .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) .build(); - // Max polling request size - int maxPollingSize = 32; - Semaphore semaphore = new Semaphore(maxPollingSize); // Max message num for each long polling. int maxMessageNum = 16; // Set message invisible duration after it is received. @@ -91,11 +87,9 @@ public static void main(String[] args) throws ClientException, InterruptedExcept ExecutorService ackCallbackExecutor = Executors.newCachedThreadPool(); // Receive message. do { - semaphore.acquire(); final CompletableFuture> future0 = consumer.receiveAsync(maxMessageNum, invisibleDuration); future0.whenCompleteAsync(((messages, throwable) -> { - semaphore.release(); if (null != throwable) { log.error("Failed to receive message from remote", throwable); // Return early.