Skip to content

Commit

Permalink
Test compatibility
Browse files Browse the repository at this point in the history
  • Loading branch information
aaron-ai committed Mar 31, 2023
1 parent fd17c0a commit 5963fd3
Show file tree
Hide file tree
Showing 14 changed files with 60 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
* <p> Refer to {@link PushConsumer}, push consumer will get message from server and dispatch the message to the
* backend thread pool to consumer message concurrently.
*/
public interface MessageListener {
public interface MqMessageListener {
/**
* The callback interface to consume the message.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public interface PushConsumer extends Closeable {
* Remove subscription expression dynamically by topic.
*
* <p>It stops the backend task to fetch messages from the server, and besides that, the locally cached message
* whose topic was removed before would not be delivered to {@link MessageListener} anymore.
* whose topic was removed before would not be delivered to {@link MqMessageListener} anymore.
*
* <p>Nothing occurs if the specified topic does not exist in subscription expressions of the push consumer.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public interface PushConsumerBuilder {
* @param listener message listener.
* @return the consumer builder instance.
*/
PushConsumerBuilder setMessageListener(MessageListener listener);
PushConsumerBuilder setMessageListener(MqMessageListener listener);

/**
* Set the maximum number of messages cached locally.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public interface SimpleConsumer extends Closeable {
*
* <p>It stops the backend task to fetch messages from remote, and besides that, the locally cached message whose
* topic
* was removed before would not be delivered to {@link MessageListener} anymore.
* was removed before would not be delivered to {@link MqMessageListener} anymore.
*
* <p>Nothing occurs if the specified topic does not exist in subscription expressions of the push consumer.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.MessageListener;
import org.apache.rocketmq.client.apis.consumer.MqMessageListener;
import org.apache.rocketmq.client.java.hook.MessageInterceptor;
import org.apache.rocketmq.client.java.message.MessageViewImpl;
import org.apache.rocketmq.client.java.misc.ClientId;
Expand All @@ -41,15 +41,15 @@ public abstract class ConsumeService {
private static final Logger log = LoggerFactory.getLogger(ConsumeService.class);

protected final ClientId clientId;
private final MessageListener messageListener;
private final MqMessageListener mqMessageListener;
private final ThreadPoolExecutor consumptionExecutor;
private final MessageInterceptor messageInterceptor;
private final ScheduledExecutorService scheduler;

public ConsumeService(ClientId clientId, MessageListener messageListener, ThreadPoolExecutor consumptionExecutor,
MessageInterceptor messageInterceptor, ScheduledExecutorService scheduler) {
public ConsumeService(ClientId clientId, MqMessageListener mqMessageListener, ThreadPoolExecutor consumptionExecutor,
MessageInterceptor messageInterceptor, ScheduledExecutorService scheduler) {
this.clientId = clientId;
this.messageListener = messageListener;
this.mqMessageListener = mqMessageListener;
this.consumptionExecutor = consumptionExecutor;
this.messageInterceptor = messageInterceptor;
this.scheduler = scheduler;
Expand All @@ -63,7 +63,7 @@ public ListenableFuture<ConsumeResult> consume(MessageViewImpl messageView) {

public ListenableFuture<ConsumeResult> consume(MessageViewImpl messageView, Duration delay) {
final ListeningExecutorService executorService = MoreExecutors.listeningDecorator(consumptionExecutor);
final ConsumeTask task = new ConsumeTask(clientId, messageListener, messageView, messageInterceptor);
final ConsumeTask task = new ConsumeTask(clientId, mqMessageListener, messageView, messageInterceptor);
// Consume message with no delay.
if (Duration.ZERO.compareTo(delay) >= 0) {
return executorService.submit(task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.util.List;
import java.util.concurrent.Callable;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.MessageListener;
import org.apache.rocketmq.client.apis.consumer.MqMessageListener;
import org.apache.rocketmq.client.java.hook.MessageHookPoints;
import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
import org.apache.rocketmq.client.java.hook.MessageInterceptor;
Expand All @@ -37,20 +37,20 @@ public class ConsumeTask implements Callable<ConsumeResult> {
private static final Logger log = LoggerFactory.getLogger(ConsumeTask.class);

private final ClientId clientId;
private final MessageListener messageListener;
private final MqMessageListener mqMessageListener;
private final MessageViewImpl messageView;
private final MessageInterceptor messageInterceptor;

public ConsumeTask(ClientId clientId, MessageListener messageListener, MessageViewImpl messageView,
MessageInterceptor messageInterceptor) {
public ConsumeTask(ClientId clientId, MqMessageListener mqMessageListener, MessageViewImpl messageView,
MessageInterceptor messageInterceptor) {
this.clientId = clientId;
this.messageListener = messageListener;
this.mqMessageListener = mqMessageListener;
this.messageView = messageView;
this.messageInterceptor = messageInterceptor;
}

/**
* Invoke {@link MessageListener} to consumer message.
* Invoke {@link MqMessageListener} to consumer message.
*
* @return message(s) which is consumed successfully.
*/
Expand All @@ -61,7 +61,7 @@ public ConsumeResult call() {
MessageInterceptorContextImpl context = new MessageInterceptorContextImpl(MessageHookPoints.CONSUME);
messageInterceptor.doBefore(context, generalMessages);
try {
consumeResult = messageListener.consume(messageView);
consumeResult = mqMessageListener.consume(messageView);
} catch (Throwable t) {
log.error("Message listener raised an exception while consuming messages, clientId={}", clientId, t);
// If exception was thrown during the period of message consumption, mark it as failure.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.MessageListener;
import org.apache.rocketmq.client.apis.consumer.MqMessageListener;
import org.apache.rocketmq.client.java.hook.MessageInterceptor;
import org.apache.rocketmq.client.java.message.MessageViewImpl;
import org.apache.rocketmq.client.java.misc.ClientId;
Expand All @@ -36,10 +36,10 @@
class FifoConsumeService extends ConsumeService {
private static final Logger log = LoggerFactory.getLogger(FifoConsumeService.class);

public FifoConsumeService(ClientId clientId, MessageListener messageListener,
public FifoConsumeService(ClientId clientId, MqMessageListener mqMessageListener,
ThreadPoolExecutor consumptionExecutor, MessageInterceptor messageInterceptor,
ScheduledExecutorService scheduler) {
super(clientId, messageListener, consumptionExecutor, messageInterceptor, scheduler);
super(clientId, mqMessageListener, consumptionExecutor, messageInterceptor, scheduler);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.MessageListener;
import org.apache.rocketmq.client.apis.consumer.MqMessageListener;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.client.apis.consumer.PushConsumerBuilder;

Expand All @@ -37,7 +37,7 @@ public class PushConsumerBuilderImpl implements PushConsumerBuilder {
private ClientConfiguration clientConfiguration = null;
private String consumerGroup = null;
private Map<String, FilterExpression> subscriptionExpressions = new ConcurrentHashMap<>();
private MessageListener messageListener = null;
private MqMessageListener mqMessageListener = null;
private int maxCacheMessageCount = 1024;
private int maxCacheMessageSizeInBytes = 64 * 1024 * 1024;
private int consumptionThreadCount = 20;
Expand Down Expand Up @@ -75,11 +75,11 @@ public PushConsumerBuilder setSubscriptionExpressions(Map<String, FilterExpressi
}

/**
* @see PushConsumerBuilder#setMessageListener(MessageListener)
* @see PushConsumerBuilder#setMessageListener(MqMessageListener)
*/
@Override
public PushConsumerBuilder setMessageListener(MessageListener messageListener) {
this.messageListener = checkNotNull(messageListener, "messageListener should not be null");
public PushConsumerBuilder setMessageListener(MqMessageListener mqMessageListener) {
this.mqMessageListener = checkNotNull(mqMessageListener, "messageListener should not be null");
return this;
}

Expand Down Expand Up @@ -120,10 +120,10 @@ public PushConsumerBuilder setConsumptionThreadCount(int consumptionThreadCount)
public PushConsumer build() throws ClientException {
checkNotNull(clientConfiguration, "clientConfiguration has not been set yet");
checkNotNull(consumerGroup, "consumerGroup has not been set yet");
checkNotNull(messageListener, "messageListener has not been set yet");
checkNotNull(mqMessageListener, "messageListener has not been set yet");
checkArgument(!subscriptionExpressions.isEmpty(), "subscriptionExpressions have not been set yet");
final PushConsumerImpl pushConsumer = new PushConsumerImpl(clientConfiguration, consumerGroup,
subscriptionExpressions, messageListener, maxCacheMessageCount, maxCacheMessageSizeInBytes,
subscriptionExpressions, mqMessageListener, maxCacheMessageCount, maxCacheMessageSizeInBytes,
consumptionThreadCount);
pushConsumer.startAsync().awaitRunning();
return pushConsumer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.MessageListener;
import org.apache.rocketmq.client.apis.consumer.MqMessageListener;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.java.exception.StatusChecker;
Expand Down Expand Up @@ -98,7 +98,7 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer {
private final String consumerGroup;
private final Map<String /* topic */, FilterExpression> subscriptionExpressions;
private final ConcurrentMap<String /* topic */, Assignments> cacheAssignments;
private final MessageListener messageListener;
private final MqMessageListener mqMessageListener;
private final int maxCacheMessageCount;
private final int maxCacheMessageSizeInBytes;

Expand All @@ -122,7 +122,7 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer {
* logging warnings already, so we avoid repeating args check here.
*/
public PushConsumerImpl(ClientConfiguration clientConfiguration, String consumerGroup,
Map<String, FilterExpression> subscriptionExpressions, MessageListener messageListener,
Map<String, FilterExpression> subscriptionExpressions, MqMessageListener mqMessageListener,
int maxCacheMessageCount, int maxCacheMessageSizeInBytes, int consumptionThreadCount) {
super(clientConfiguration, consumerGroup, subscriptionExpressions.keySet());
this.clientConfiguration = clientConfiguration;
Expand All @@ -132,7 +132,7 @@ public PushConsumerImpl(ClientConfiguration clientConfiguration, String consumer
this.consumerGroup = consumerGroup;
this.subscriptionExpressions = subscriptionExpressions;
this.cacheAssignments = new ConcurrentHashMap<>();
this.messageListener = messageListener;
this.mqMessageListener = mqMessageListener;
this.maxCacheMessageCount = maxCacheMessageCount;
this.maxCacheMessageSizeInBytes = maxCacheMessageSizeInBytes;

Expand Down Expand Up @@ -193,10 +193,10 @@ private ConsumeService createConsumeService() {
final ScheduledExecutorService scheduler = this.getClientManager().getScheduler();
if (pushSubscriptionSettings.isFifo()) {
log.info("Create FIFO consume service, consumerGroup={}, clientId={}", consumerGroup, clientId);
return new FifoConsumeService(clientId, messageListener, consumptionExecutor, this, scheduler);
return new FifoConsumeService(clientId, mqMessageListener, consumptionExecutor, this, scheduler);
}
log.info("Create standard consume service, consumerGroup={}, clientId={}", consumerGroup, clientId);
return new StandardConsumeService(clientId, messageListener, consumptionExecutor, this, scheduler);
return new StandardConsumeService(clientId, mqMessageListener, consumptionExecutor, this, scheduler);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.MessageListener;
import org.apache.rocketmq.client.apis.consumer.MqMessageListener;
import org.apache.rocketmq.client.java.hook.MessageInterceptor;
import org.apache.rocketmq.client.java.message.MessageViewImpl;
import org.apache.rocketmq.client.java.misc.ClientId;
Expand All @@ -36,10 +36,10 @@
public class StandardConsumeService extends ConsumeService {
private static final Logger log = LoggerFactory.getLogger(StandardConsumeService.class);

public StandardConsumeService(ClientId clientId, MessageListener messageListener,
public StandardConsumeService(ClientId clientId, MqMessageListener mqMessageListener,
ThreadPoolExecutor consumptionExecutor, MessageInterceptor messageInterceptor,
ScheduledExecutorService scheduler) {
super(clientId, messageListener, consumptionExecutor, messageInterceptor, scheduler);
super(clientId, mqMessageListener, consumptionExecutor, messageInterceptor, scheduler);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.MessageListener;
import org.apache.rocketmq.client.apis.consumer.MqMessageListener;
import org.apache.rocketmq.client.java.hook.MessageInterceptor;
import org.apache.rocketmq.client.java.message.MessageViewImpl;
import org.apache.rocketmq.client.java.misc.ClientId;
Expand All @@ -50,8 +50,8 @@ public class ConsumeServiceTest extends TestBase {

@Test
public void testConsumeSuccess() throws ExecutionException, InterruptedException, TimeoutException {
final MessageListener messageListener = messageView -> ConsumeResult.SUCCESS;
final ConsumeService consumeService = new ConsumeService(clientId, messageListener,
final MqMessageListener mqMessageListener = messageView -> ConsumeResult.SUCCESS;
final ConsumeService consumeService = new ConsumeService(clientId, mqMessageListener,
consumptionExecutor, interceptor, scheduler) {
@Override
public void consume(ProcessQueue pq, List<MessageViewImpl> messageViews) {
Expand All @@ -65,8 +65,8 @@ public void consume(ProcessQueue pq, List<MessageViewImpl> messageViews) {

@Test
public void testConsumeFailure() throws ExecutionException, InterruptedException, TimeoutException {
final MessageListener messageListener = messageView -> ConsumeResult.FAILURE;
final ConsumeService consumeService = new ConsumeService(clientId, messageListener,
final MqMessageListener mqMessageListener = messageView -> ConsumeResult.FAILURE;
final ConsumeService consumeService = new ConsumeService(clientId, mqMessageListener,
consumptionExecutor, interceptor, scheduler) {
@Override
public void consume(ProcessQueue pq, List<MessageViewImpl> messageViews) {
Expand All @@ -80,10 +80,10 @@ public void consume(ProcessQueue pq, List<MessageViewImpl> messageViews) {

@Test
public void testConsumeWithException() throws ExecutionException, InterruptedException, TimeoutException {
final MessageListener messageListener = messageView -> {
final MqMessageListener mqMessageListener = messageView -> {
throw new RuntimeException();
};
final ConsumeService consumeService = new ConsumeService(clientId, messageListener,
final ConsumeService consumeService = new ConsumeService(clientId, mqMessageListener,
consumptionExecutor, interceptor, scheduler) {
@Override
public void consume(ProcessQueue pq, List<MessageViewImpl> messageViews) {
Expand All @@ -98,8 +98,8 @@ public void consume(ProcessQueue pq, List<MessageViewImpl> messageViews) {

@Test
public void testConsumeWithDelay() throws ExecutionException, InterruptedException {
final MessageListener messageListener = messageView -> ConsumeResult.SUCCESS;
final ConsumeService consumeService = new ConsumeService(clientId, messageListener,
final MqMessageListener mqMessageListener = messageView -> ConsumeResult.SUCCESS;
final ConsumeService consumeService = new ConsumeService(clientId, mqMessageListener,
consumptionExecutor, interceptor, scheduler) {

@Override
Expand Down
Loading

0 comments on commit 5963fd3

Please sign in to comment.