Skip to content

Commit

Permalink
Set attemptId for ReceiveMessageRequest
Browse files Browse the repository at this point in the history
  • Loading branch information
aaron-ai committed Jun 14, 2023
1 parent edcd7f9 commit ad2279c
Showing 1 changed file with 9 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -198,8 +199,12 @@ private void receiveMessageLater(Duration delay, String attemptId) {
}
}

private String generateAttemptId() {
return UUID.randomUUID().toString();
}

public void receiveMessage() {
receiveMessage(null);
receiveMessage(this.generateAttemptId());
}

public void receiveMessage(String attemptId) {
Expand All @@ -217,7 +222,7 @@ public void receiveMessage(String attemptId) {
}

private void receiveMessageImmediately() {
receiveMessageImmediately(null);
receiveMessageImmediately(this.generateAttemptId());
}

private void receiveMessageImmediately(String attemptId) {
Expand Down Expand Up @@ -275,8 +280,8 @@ public void onFailure(Throwable t) {
new MessageInterceptorContextImpl(context, MessageHookPointsStatus.ERROR);
consumer.doAfter(context0, Collections.emptyList());

log.error("Exception raised during message reception, mq={}, endpoints={}, clientId={}", mq,
endpoints, clientId, t);
log.error("Exception raised during message reception, mq={}, endpoints={}, attemptId={}," +
" clientId={}", mq, endpoints, request.getAttemptId(), clientId, t);
onReceiveMessageException(t, attemptId);
}
}, MoreExecutors.directExecutor());
Expand Down

0 comments on commit ad2279c

Please sign in to comment.