Skip to content

Commit

Permalink
Remove semaphore from AsyncSimpleConsumerExample
Browse files Browse the repository at this point in the history
Signed-off-by: Aaron Ai <[email protected]>
  • Loading branch information
aaron-ai committed Nov 6, 2023
1 parent 20166a2 commit 8769f0a
Showing 1 changed file with 1 addition and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.stream.Collectors;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
Expand All @@ -46,7 +45,7 @@ private AsyncSimpleConsumerExample() {
}

@SuppressWarnings({"resource", "InfiniteLoopStatement"})
public static void main(String[] args) throws ClientException, InterruptedException {
public static void main(String[] args) throws ClientException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();

// Credential provider is optional for client configuration.
Expand Down Expand Up @@ -78,9 +77,6 @@ public static void main(String[] args) throws ClientException, InterruptedExcept
// Set the subscription for the consumer.
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.build();
// Max polling request size
int maxPollingSize = 32;
Semaphore semaphore = new Semaphore(maxPollingSize);
// Max message num for each long polling.
int maxMessageNum = 16;
// Set message invisible duration after it is received.
Expand All @@ -91,11 +87,9 @@ public static void main(String[] args) throws ClientException, InterruptedExcept
ExecutorService ackCallbackExecutor = Executors.newCachedThreadPool();
// Receive message.
do {
semaphore.acquire();
final CompletableFuture<List<MessageView>> future0 = consumer.receiveAsync(maxMessageNum,
invisibleDuration);
future0.whenCompleteAsync(((messages, throwable) -> {
semaphore.release();
if (null != throwable) {
log.error("Failed to receive message from remote", throwable);
// Return early.
Expand Down

0 comments on commit 8769f0a

Please sign in to comment.