diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/MessageListener.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/MqMessageListener.java similarity index 97% rename from java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/MessageListener.java rename to java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/MqMessageListener.java index fc66a4f97..b28d966db 100644 --- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/MessageListener.java +++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/MqMessageListener.java @@ -25,7 +25,7 @@ *

Refer to {@link PushConsumer}, push consumer will get message from server and dispatch the message to the * backend thread pool to consumer message concurrently. */ -public interface MessageListener { +public interface MqMessageListener { /** * The callback interface to consume the message. * diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumer.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumer.java index d123e7511..3fd0c3f3c 100644 --- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumer.java +++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumer.java @@ -89,7 +89,7 @@ public interface PushConsumer extends Closeable { * Remove subscription expression dynamically by topic. * *

It stops the backend task to fetch messages from the server, and besides that, the locally cached message - * whose topic was removed before would not be delivered to {@link MessageListener} anymore. + * whose topic was removed before would not be delivered to {@link MqMessageListener} anymore. * *

Nothing occurs if the specified topic does not exist in subscription expressions of the push consumer. * diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumerBuilder.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumerBuilder.java index e971d9457..08896d252 100644 --- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumerBuilder.java +++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumerBuilder.java @@ -55,7 +55,7 @@ public interface PushConsumerBuilder { * @param listener message listener. * @return the consumer builder instance. */ - PushConsumerBuilder setMessageListener(MessageListener listener); + PushConsumerBuilder setMessageListener(MqMessageListener listener); /** * Set the maximum number of messages cached locally. diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/SimpleConsumer.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/SimpleConsumer.java index 6d7d9921d..de6fb9a78 100644 --- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/SimpleConsumer.java +++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/SimpleConsumer.java @@ -81,7 +81,7 @@ public interface SimpleConsumer extends Closeable { * *

It stops the backend task to fetch messages from remote, and besides that, the locally cached message whose * topic - * was removed before would not be delivered to {@link MessageListener} anymore. + * was removed before would not be delivered to {@link MqMessageListener} anymore. * *

Nothing occurs if the specified topic does not exist in subscription expressions of the push consumer. * diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java index 40adb9fb4..6a478efde 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java @@ -29,7 +29,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.client.apis.consumer.ConsumeResult; -import org.apache.rocketmq.client.apis.consumer.MessageListener; +import org.apache.rocketmq.client.apis.consumer.MqMessageListener; import org.apache.rocketmq.client.java.hook.MessageInterceptor; import org.apache.rocketmq.client.java.message.MessageViewImpl; import org.apache.rocketmq.client.java.misc.ClientId; @@ -41,15 +41,15 @@ public abstract class ConsumeService { private static final Logger log = LoggerFactory.getLogger(ConsumeService.class); protected final ClientId clientId; - private final MessageListener messageListener; + private final MqMessageListener mqMessageListener; private final ThreadPoolExecutor consumptionExecutor; private final MessageInterceptor messageInterceptor; private final ScheduledExecutorService scheduler; - public ConsumeService(ClientId clientId, MessageListener messageListener, ThreadPoolExecutor consumptionExecutor, - MessageInterceptor messageInterceptor, ScheduledExecutorService scheduler) { + public ConsumeService(ClientId clientId, MqMessageListener mqMessageListener, ThreadPoolExecutor consumptionExecutor, + MessageInterceptor messageInterceptor, ScheduledExecutorService scheduler) { this.clientId = clientId; - this.messageListener = messageListener; + this.mqMessageListener = mqMessageListener; this.consumptionExecutor = consumptionExecutor; this.messageInterceptor = messageInterceptor; this.scheduler = scheduler; @@ -63,7 +63,7 @@ public ListenableFuture consume(MessageViewImpl messageView) { public ListenableFuture consume(MessageViewImpl messageView, Duration delay) { final ListeningExecutorService executorService = MoreExecutors.listeningDecorator(consumptionExecutor); - final ConsumeTask task = new ConsumeTask(clientId, messageListener, messageView, messageInterceptor); + final ConsumeTask task = new ConsumeTask(clientId, mqMessageListener, messageView, messageInterceptor); // Consume message with no delay. if (Duration.ZERO.compareTo(delay) >= 0) { return executorService.submit(task); diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTask.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTask.java index 90e34c5a1..7e873d4bf 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTask.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTask.java @@ -21,7 +21,7 @@ import java.util.List; import java.util.concurrent.Callable; import org.apache.rocketmq.client.apis.consumer.ConsumeResult; -import org.apache.rocketmq.client.apis.consumer.MessageListener; +import org.apache.rocketmq.client.apis.consumer.MqMessageListener; import org.apache.rocketmq.client.java.hook.MessageHookPoints; import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus; import org.apache.rocketmq.client.java.hook.MessageInterceptor; @@ -37,20 +37,20 @@ public class ConsumeTask implements Callable { private static final Logger log = LoggerFactory.getLogger(ConsumeTask.class); private final ClientId clientId; - private final MessageListener messageListener; + private final MqMessageListener mqMessageListener; private final MessageViewImpl messageView; private final MessageInterceptor messageInterceptor; - public ConsumeTask(ClientId clientId, MessageListener messageListener, MessageViewImpl messageView, - MessageInterceptor messageInterceptor) { + public ConsumeTask(ClientId clientId, MqMessageListener mqMessageListener, MessageViewImpl messageView, + MessageInterceptor messageInterceptor) { this.clientId = clientId; - this.messageListener = messageListener; + this.mqMessageListener = mqMessageListener; this.messageView = messageView; this.messageInterceptor = messageInterceptor; } /** - * Invoke {@link MessageListener} to consumer message. + * Invoke {@link MqMessageListener} to consumer message. * * @return message(s) which is consumed successfully. */ @@ -61,7 +61,7 @@ public ConsumeResult call() { MessageInterceptorContextImpl context = new MessageInterceptorContextImpl(MessageHookPoints.CONSUME); messageInterceptor.doBefore(context, generalMessages); try { - consumeResult = messageListener.consume(messageView); + consumeResult = mqMessageListener.consume(messageView); } catch (Throwable t) { log.error("Message listener raised an exception while consuming messages, clientId={}", clientId, t); // If exception was thrown during the period of message consumption, mark it as failure. diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java index 07fb6238d..ed909251a 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java @@ -25,7 +25,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import org.apache.rocketmq.client.apis.consumer.ConsumeResult; -import org.apache.rocketmq.client.apis.consumer.MessageListener; +import org.apache.rocketmq.client.apis.consumer.MqMessageListener; import org.apache.rocketmq.client.java.hook.MessageInterceptor; import org.apache.rocketmq.client.java.message.MessageViewImpl; import org.apache.rocketmq.client.java.misc.ClientId; @@ -36,10 +36,10 @@ class FifoConsumeService extends ConsumeService { private static final Logger log = LoggerFactory.getLogger(FifoConsumeService.class); - public FifoConsumeService(ClientId clientId, MessageListener messageListener, + public FifoConsumeService(ClientId clientId, MqMessageListener mqMessageListener, ThreadPoolExecutor consumptionExecutor, MessageInterceptor messageInterceptor, ScheduledExecutorService scheduler) { - super(clientId, messageListener, consumptionExecutor, messageInterceptor, scheduler); + super(clientId, mqMessageListener, consumptionExecutor, messageInterceptor, scheduler); } @Override diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImpl.java index d270ae7cb..282d427d5 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImpl.java @@ -26,7 +26,7 @@ import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.consumer.FilterExpression; -import org.apache.rocketmq.client.apis.consumer.MessageListener; +import org.apache.rocketmq.client.apis.consumer.MqMessageListener; import org.apache.rocketmq.client.apis.consumer.PushConsumer; import org.apache.rocketmq.client.apis.consumer.PushConsumerBuilder; @@ -37,7 +37,7 @@ public class PushConsumerBuilderImpl implements PushConsumerBuilder { private ClientConfiguration clientConfiguration = null; private String consumerGroup = null; private Map subscriptionExpressions = new ConcurrentHashMap<>(); - private MessageListener messageListener = null; + private MqMessageListener mqMessageListener = null; private int maxCacheMessageCount = 1024; private int maxCacheMessageSizeInBytes = 64 * 1024 * 1024; private int consumptionThreadCount = 20; @@ -75,11 +75,11 @@ public PushConsumerBuilder setSubscriptionExpressions(Map subscriptionExpressions; private final ConcurrentMap cacheAssignments; - private final MessageListener messageListener; + private final MqMessageListener mqMessageListener; private final int maxCacheMessageCount; private final int maxCacheMessageSizeInBytes; @@ -122,7 +122,7 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer { * logging warnings already, so we avoid repeating args check here. */ public PushConsumerImpl(ClientConfiguration clientConfiguration, String consumerGroup, - Map subscriptionExpressions, MessageListener messageListener, + Map subscriptionExpressions, MqMessageListener mqMessageListener, int maxCacheMessageCount, int maxCacheMessageSizeInBytes, int consumptionThreadCount) { super(clientConfiguration, consumerGroup, subscriptionExpressions.keySet()); this.clientConfiguration = clientConfiguration; @@ -132,7 +132,7 @@ public PushConsumerImpl(ClientConfiguration clientConfiguration, String consumer this.consumerGroup = consumerGroup; this.subscriptionExpressions = subscriptionExpressions; this.cacheAssignments = new ConcurrentHashMap<>(); - this.messageListener = messageListener; + this.mqMessageListener = mqMessageListener; this.maxCacheMessageCount = maxCacheMessageCount; this.maxCacheMessageSizeInBytes = maxCacheMessageSizeInBytes; @@ -193,10 +193,10 @@ private ConsumeService createConsumeService() { final ScheduledExecutorService scheduler = this.getClientManager().getScheduler(); if (pushSubscriptionSettings.isFifo()) { log.info("Create FIFO consume service, consumerGroup={}, clientId={}", consumerGroup, clientId); - return new FifoConsumeService(clientId, messageListener, consumptionExecutor, this, scheduler); + return new FifoConsumeService(clientId, mqMessageListener, consumptionExecutor, this, scheduler); } log.info("Create standard consume service, consumerGroup={}, clientId={}", consumerGroup, clientId); - return new StandardConsumeService(clientId, messageListener, consumptionExecutor, this, scheduler); + return new StandardConsumeService(clientId, mqMessageListener, consumptionExecutor, this, scheduler); } /** diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeService.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeService.java index 2775077ae..85803046a 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeService.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeService.java @@ -25,7 +25,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import org.apache.rocketmq.client.apis.consumer.ConsumeResult; -import org.apache.rocketmq.client.apis.consumer.MessageListener; +import org.apache.rocketmq.client.apis.consumer.MqMessageListener; import org.apache.rocketmq.client.java.hook.MessageInterceptor; import org.apache.rocketmq.client.java.message.MessageViewImpl; import org.apache.rocketmq.client.java.misc.ClientId; @@ -36,10 +36,10 @@ public class StandardConsumeService extends ConsumeService { private static final Logger log = LoggerFactory.getLogger(StandardConsumeService.class); - public StandardConsumeService(ClientId clientId, MessageListener messageListener, + public StandardConsumeService(ClientId clientId, MqMessageListener mqMessageListener, ThreadPoolExecutor consumptionExecutor, MessageInterceptor messageInterceptor, ScheduledExecutorService scheduler) { - super(clientId, messageListener, consumptionExecutor, messageInterceptor, scheduler); + super(clientId, mqMessageListener, consumptionExecutor, messageInterceptor, scheduler); } @Override diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeServiceTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeServiceTest.java index dbf8c89b2..e0802e137 100644 --- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeServiceTest.java +++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeServiceTest.java @@ -30,7 +30,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.rocketmq.client.apis.consumer.ConsumeResult; -import org.apache.rocketmq.client.apis.consumer.MessageListener; +import org.apache.rocketmq.client.apis.consumer.MqMessageListener; import org.apache.rocketmq.client.java.hook.MessageInterceptor; import org.apache.rocketmq.client.java.message.MessageViewImpl; import org.apache.rocketmq.client.java.misc.ClientId; @@ -50,8 +50,8 @@ public class ConsumeServiceTest extends TestBase { @Test public void testConsumeSuccess() throws ExecutionException, InterruptedException, TimeoutException { - final MessageListener messageListener = messageView -> ConsumeResult.SUCCESS; - final ConsumeService consumeService = new ConsumeService(clientId, messageListener, + final MqMessageListener mqMessageListener = messageView -> ConsumeResult.SUCCESS; + final ConsumeService consumeService = new ConsumeService(clientId, mqMessageListener, consumptionExecutor, interceptor, scheduler) { @Override public void consume(ProcessQueue pq, List messageViews) { @@ -65,8 +65,8 @@ public void consume(ProcessQueue pq, List messageViews) { @Test public void testConsumeFailure() throws ExecutionException, InterruptedException, TimeoutException { - final MessageListener messageListener = messageView -> ConsumeResult.FAILURE; - final ConsumeService consumeService = new ConsumeService(clientId, messageListener, + final MqMessageListener mqMessageListener = messageView -> ConsumeResult.FAILURE; + final ConsumeService consumeService = new ConsumeService(clientId, mqMessageListener, consumptionExecutor, interceptor, scheduler) { @Override public void consume(ProcessQueue pq, List messageViews) { @@ -80,10 +80,10 @@ public void consume(ProcessQueue pq, List messageViews) { @Test public void testConsumeWithException() throws ExecutionException, InterruptedException, TimeoutException { - final MessageListener messageListener = messageView -> { + final MqMessageListener mqMessageListener = messageView -> { throw new RuntimeException(); }; - final ConsumeService consumeService = new ConsumeService(clientId, messageListener, + final ConsumeService consumeService = new ConsumeService(clientId, mqMessageListener, consumptionExecutor, interceptor, scheduler) { @Override public void consume(ProcessQueue pq, List messageViews) { @@ -98,8 +98,8 @@ public void consume(ProcessQueue pq, List messageViews) { @Test public void testConsumeWithDelay() throws ExecutionException, InterruptedException { - final MessageListener messageListener = messageView -> ConsumeResult.SUCCESS; - final ConsumeService consumeService = new ConsumeService(clientId, messageListener, + final MqMessageListener mqMessageListener = messageView -> ConsumeResult.SUCCESS; + final ConsumeService consumeService = new ConsumeService(clientId, mqMessageListener, consumptionExecutor, interceptor, scheduler) { @Override diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTaskTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTaskTest.java index 23312142a..46826e1fa 100644 --- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTaskTest.java +++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTaskTest.java @@ -20,7 +20,7 @@ import static org.junit.Assert.assertEquals; import org.apache.rocketmq.client.apis.consumer.ConsumeResult; -import org.apache.rocketmq.client.apis.consumer.MessageListener; +import org.apache.rocketmq.client.apis.consumer.MqMessageListener; import org.apache.rocketmq.client.java.hook.MessageInterceptor; import org.apache.rocketmq.client.java.message.MessageViewImpl; import org.apache.rocketmq.client.java.misc.ClientId; @@ -34,10 +34,10 @@ public class ConsumeTaskTest extends TestBase { public void testCallWithConsumeSuccess() { ClientId clientId = new ClientId(); final MessageViewImpl messageView = fakeMessageViewImpl(); - final MessageListener messageListener = Mockito.mock(MessageListener.class); - Mockito.when(messageListener.consume(messageView)).thenReturn(ConsumeResult.SUCCESS); + final MqMessageListener mqMessageListener = Mockito.mock(MqMessageListener.class); + Mockito.when(mqMessageListener.consume(messageView)).thenReturn(ConsumeResult.SUCCESS); final MessageInterceptor messageInterceptor = Mockito.mock(MessageInterceptor.class); - final ConsumeTask consumeTask = new ConsumeTask(clientId, messageListener, messageView, messageInterceptor); + final ConsumeTask consumeTask = new ConsumeTask(clientId, mqMessageListener, messageView, messageInterceptor); final ConsumeResult consumeResult = consumeTask.call(); assertEquals(ConsumeResult.SUCCESS, consumeResult); } @@ -46,10 +46,10 @@ public void testCallWithConsumeSuccess() { public void testCallWithConsumeException() { ClientId clientId = new ClientId(); final MessageViewImpl messageView = fakeMessageViewImpl(); - final MessageListener messageListener = Mockito.mock(MessageListener.class); - Mockito.when(messageListener.consume(messageView)).thenThrow(new RuntimeException()); + final MqMessageListener mqMessageListener = Mockito.mock(MqMessageListener.class); + Mockito.when(mqMessageListener.consume(messageView)).thenThrow(new RuntimeException()); final MessageInterceptor messageInterceptor = Mockito.mock(MessageInterceptor.class); - final ConsumeTask consumeTask = new ConsumeTask(clientId, messageListener, messageView, messageInterceptor); + final ConsumeTask consumeTask = new ConsumeTask(clientId, mqMessageListener, messageView, messageInterceptor); final ConsumeResult consumeResult = consumeTask.call(); assertEquals(ConsumeResult.FAILURE, consumeResult); } diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImplTest.java index 619cd7d20..a8f81710d 100644 --- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImplTest.java +++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImplTest.java @@ -33,7 +33,7 @@ import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.consumer.ConsumeResult; import org.apache.rocketmq.client.apis.consumer.FilterExpression; -import org.apache.rocketmq.client.apis.consumer.MessageListener; +import org.apache.rocketmq.client.apis.consumer.MqMessageListener; import org.apache.rocketmq.client.java.impl.ClientManager; import org.apache.rocketmq.client.java.message.MessageViewImpl; import org.apache.rocketmq.client.java.route.Endpoints; @@ -49,7 +49,7 @@ @RunWith(MockitoJUnitRunner.class) public class ConsumerImplTest extends TestBase { private final Map subscriptionExpressions = createSubscriptionExpressions(FAKE_TOPIC_0); - private final MessageListener messageListener = messageView -> ConsumeResult.SUCCESS; + private final MqMessageListener mqMessageListener = messageView -> ConsumeResult.SUCCESS; private final ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(FAKE_ENDPOINTS).build(); @@ -59,7 +59,7 @@ public void testReceiveMessage() throws ExecutionException, InterruptedException int maxCacheMessageSizeInBytes = 1024; int consumptionThreadCount = 4; PushConsumerImpl pushConsumer = Mockito.spy(new PushConsumerImpl(clientConfiguration, FAKE_CONSUMER_GROUP_0, - subscriptionExpressions, messageListener, maxCacheMessageCount, maxCacheMessageSizeInBytes, + subscriptionExpressions, mqMessageListener, maxCacheMessageCount, maxCacheMessageSizeInBytes, consumptionThreadCount)); final ClientManager clientManager = Mockito.mock(ClientManager.class); Mockito.doReturn(clientManager).when(pushConsumer).getClientManager(); @@ -84,7 +84,7 @@ public void testAckMessage() throws ExecutionException, InterruptedException { int maxCacheMessageSizeInBytes = 1024; int consumptionThreadCount = 4; PushConsumerImpl pushConsumer = Mockito.spy(new PushConsumerImpl(clientConfiguration, FAKE_CONSUMER_GROUP_0, - subscriptionExpressions, messageListener, maxCacheMessageCount, maxCacheMessageSizeInBytes, + subscriptionExpressions, mqMessageListener, maxCacheMessageCount, maxCacheMessageSizeInBytes, consumptionThreadCount)); final ClientManager clientManager = Mockito.mock(ClientManager.class); Mockito.doReturn(clientManager).when(pushConsumer).getClientManager(); @@ -105,7 +105,7 @@ public void testChangeInvisibleDuration() throws ExecutionException, Interrupted int maxCacheMessageSizeInBytes = 1024; int consumptionThreadCount = 4; PushConsumerImpl pushConsumer = Mockito.spy(new PushConsumerImpl(clientConfiguration, FAKE_CONSUMER_GROUP_0, - subscriptionExpressions, messageListener, maxCacheMessageCount, maxCacheMessageSizeInBytes, + subscriptionExpressions, mqMessageListener, maxCacheMessageCount, maxCacheMessageSizeInBytes, consumptionThreadCount)); final ClientManager clientManager = Mockito.mock(ClientManager.class); Mockito.doReturn(clientManager).when(pushConsumer).getClientManager(); diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java index 673e71134..8ba848347 100644 --- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java +++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java @@ -31,7 +31,7 @@ import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.consumer.ConsumeResult; import org.apache.rocketmq.client.apis.consumer.FilterExpression; -import org.apache.rocketmq.client.apis.consumer.MessageListener; +import org.apache.rocketmq.client.apis.consumer.MqMessageListener; import org.apache.rocketmq.client.java.route.MessageQueueImpl; import org.apache.rocketmq.client.java.tool.TestBase; import org.junit.Test; @@ -45,7 +45,7 @@ public class PushConsumerImplTest extends TestBase { private final Map subscriptionExpressions = createSubscriptionExpressions(FAKE_TOPIC_0); - private final MessageListener messageListener = messageView -> ConsumeResult.SUCCESS; + private final MqMessageListener mqMessageListener = messageView -> ConsumeResult.SUCCESS; private final int maxCacheMessageCount = 8; private final int maxCacheMessageSizeInBytes = 1024; @@ -56,7 +56,7 @@ public class PushConsumerImplTest extends TestBase { @Spy private final PushConsumerImpl pushConsumer = new PushConsumerImpl(clientConfiguration, FAKE_CONSUMER_GROUP_0, - subscriptionExpressions, messageListener, maxCacheMessageCount, maxCacheMessageSizeInBytes, + subscriptionExpressions, mqMessageListener, maxCacheMessageCount, maxCacheMessageSizeInBytes, consumptionThreadCount);