Skip to content

Commit

Permalink
Polish example of simple consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
aaron-ai committed Feb 6, 2023
1 parent 70d4382 commit 6389b7f
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.rocketmq.client.java.example;

import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -45,7 +44,8 @@ public class AsyncSimpleConsumerExample {
private AsyncSimpleConsumerExample() {
}

public static void main(String[] args) throws ClientException, IOException, InterruptedException {
@SuppressWarnings({"resource", "InfiniteLoopStatement"})
public static void main(String[] args) throws ClientException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();

// Credential provider is optional for client configuration.
Expand Down Expand Up @@ -81,34 +81,36 @@ public static void main(String[] args) throws ClientException, IOException, Inte
ExecutorService receiveCallbackExecutor = Executors.newCachedThreadPool();
// Set individual thread pool for ack callback.
ExecutorService ackCallbackExecutor = Executors.newCachedThreadPool();
final CompletableFuture<List<MessageView>> future0 = consumer.receiveAsync(maxMessageNum, invisibleDuration);
future0.whenCompleteAsync(((messages, throwable) -> {
if (null != throwable) {
log.error("Failed to receive message from remote", throwable);
// Return early.
return;
}
log.info("Received {} message(s)", messages.size());
// Using messageView as key rather than message id because message id may be duplicated.
final Map<MessageView, CompletableFuture<Void>> map =
messages.stream().collect(Collectors.toMap(message -> message, consumer::ackAsync));
for (Map.Entry<MessageView, CompletableFuture<Void>> entry : map.entrySet()) {
final MessageId messageId = entry.getKey().getMessageId();
final CompletableFuture<Void> future = entry.getValue();
future.whenCompleteAsync((v, t) -> {
if (null != t) {
log.error("Message is failed to be acknowledged, messageId={}", messageId, t);
// Return early.
return;
}
log.info("Message is acknowledged successfully, messageId={}", messageId);
}, ackCallbackExecutor);
}
// Receive message.
do {
final CompletableFuture<List<MessageView>> future0 = consumer.receiveAsync(maxMessageNum,
invisibleDuration);
future0.whenCompleteAsync(((messages, throwable) -> {
if (null != throwable) {
log.error("Failed to receive message from remote", throwable);
// Return early.
return;
}
log.info("Received {} message(s)", messages.size());
// Using messageView as key rather than message id because message id may be duplicated.
final Map<MessageView, CompletableFuture<Void>> map =
messages.stream().collect(Collectors.toMap(message -> message, consumer::ackAsync));
for (Map.Entry<MessageView, CompletableFuture<Void>> entry : map.entrySet()) {
final MessageId messageId = entry.getKey().getMessageId();
final CompletableFuture<Void> future = entry.getValue();
future.whenCompleteAsync((v, t) -> {
if (null != t) {
log.error("Message is failed to be acknowledged, messageId={}", messageId, t);
// Return early.
return;
}
log.info("Message is acknowledged successfully, messageId={}", messageId);
}, ackCallbackExecutor);
}

}), receiveCallbackExecutor);
// Block to avoid exist of background threads.
Thread.sleep(Long.MAX_VALUE);
}), receiveCallbackExecutor);
} while (true);
// Close the simple consumer when you don't need it anymore.
consumer.close();
// consumer.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.rocketmq.client.java.example;

import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
Expand All @@ -40,7 +39,8 @@ public class SimpleConsumerExample {
private SimpleConsumerExample() {
}

public static void main(String[] args) throws ClientException, IOException {
@SuppressWarnings({"resource", "InfiniteLoopStatement"})
public static void main(String[] args) throws ClientException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();

// Credential provider is optional for client configuration.
Expand Down Expand Up @@ -72,18 +72,21 @@ public static void main(String[] args) throws ClientException, IOException {
int maxMessageNum = 16;
// Set message invisible duration after it is received.
Duration invisibleDuration = Duration.ofSeconds(15);
final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
log.info("Received {} message(s)", messages.size());
for (MessageView message : messages) {
final MessageId messageId = message.getMessageId();
try {
consumer.ack(message);
log.info("Message is acknowledged successfully, messageId={}", messageId);
} catch (Throwable t) {
log.error("Message is failed to be acknowledged, messageId={}", messageId, t);
// Receive message, multi-threading is more recommended.
do {
final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
log.info("Received {} message(s)", messages.size());
for (MessageView message : messages) {
final MessageId messageId = message.getMessageId();
try {
consumer.ack(message);
log.info("Message is acknowledged successfully, messageId={}", messageId);
} catch (Throwable t) {
log.error("Message is failed to be acknowledged, messageId={}", messageId, t);
}
}
}
} while (true);
// Close the simple consumer when you don't need it anymore.
consumer.close();
// consumer.close();
}
}

0 comments on commit 6389b7f

Please sign in to comment.