diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java index 271481031..042c352fe 100644 --- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java +++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java @@ -28,17 +28,19 @@ public class ClientConfiguration { private final SessionCredentialsProvider sessionCredentialsProvider; private final Duration requestTimeout; private final boolean sslEnabled; + private final String namespace; /** * The caller is supposed to have validated the arguments and handled throwing exceptions or * logging warnings already, so we avoid repeating args check here. */ ClientConfiguration(String endpoints, SessionCredentialsProvider sessionCredentialsProvider, - Duration requestTimeout, boolean sslEnabled) { + Duration requestTimeout, boolean sslEnabled, String namespace) { this.endpoints = endpoints; this.sessionCredentialsProvider = sessionCredentialsProvider; this.requestTimeout = requestTimeout; this.sslEnabled = sslEnabled; + this.namespace = namespace; } public static ClientConfigurationBuilder newBuilder() { @@ -60,4 +62,8 @@ public Duration getRequestTimeout() { public boolean isSslEnabled() { return sslEnabled; } + + public String getNamespace() { + return namespace; + } } diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java index eb40c88ce..25cc54a4c 100644 --- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java +++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java @@ -31,6 +31,7 @@ public class ClientConfigurationBuilder { private SessionCredentialsProvider sessionCredentialsProvider = null; private Duration requestTimeout = Duration.ofSeconds(3); private boolean sslEnabled = true; + private String namespace = ""; /** * Configure the access point with which the SDK should communicate. @@ -82,6 +83,16 @@ public ClientConfigurationBuilder enableSsl(boolean sslEnabled) { return this; } + /** + * Configure namespace for client + * @param namespace namespace + * @return The {@link ClientConfigurationBuilder} instance, to allow for method chaining. + */ + public ClientConfigurationBuilder setNamespace(String namespace) { + this.namespace = checkNotNull(namespace, "namespace should not be null"); + return this; + } + /** * Finalize the build of {@link ClientConfiguration}. * @@ -90,6 +101,6 @@ public ClientConfigurationBuilder enableSsl(boolean sslEnabled) { public ClientConfiguration build() { checkNotNull(endpoints, "endpoints should not be null"); checkNotNull(requestTimeout, "requestTimeout should not be null"); - return new ClientConfiguration(endpoints, sessionCredentialsProvider, requestTimeout, sslEnabled); + return new ClientConfiguration(endpoints, sessionCredentialsProvider, requestTimeout, sslEnabled, namespace); } } diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java index 846f0ce9e..dac5fe09e 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java @@ -608,7 +608,10 @@ public void onFailure(Throwable t) { } protected ListenableFuture fetchTopicRoute0(final String topic) { - Resource topicResource = Resource.newBuilder().setName(topic).build(); + Resource topicResource = Resource.newBuilder() + .setResourceNamespace(clientConfiguration.getNamespace()) + .setName(topic) + .build(); final QueryRouteRequest request = QueryRouteRequest.newBuilder().setTopic(topicResource) .setEndpoints(endpoints.toProtobuf()).build(); final RpcFuture future = diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Settings.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Settings.java index f99230182..88b335c81 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Settings.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Settings.java @@ -25,14 +25,16 @@ import org.apache.rocketmq.client.java.route.Endpoints; public abstract class Settings { + protected final String namespace; protected final ClientId clientId; protected final ClientType clientType; protected final Endpoints accessPoint; protected volatile RetryPolicy retryPolicy; protected final Duration requestTimeout; - public Settings(ClientId clientId, ClientType clientType, Endpoints accessPoint, RetryPolicy retryPolicy, - Duration requestTimeout) { + public Settings(String namespace, ClientId clientId, ClientType clientType, Endpoints accessPoint, + RetryPolicy retryPolicy, Duration requestTimeout) { + this.namespace = namespace; this.clientId = clientId; this.clientType = clientType; this.accessPoint = accessPoint; @@ -40,8 +42,9 @@ public Settings(ClientId clientId, ClientType clientType, Endpoints accessPoint, this.requestTimeout = requestTimeout; } - public Settings(ClientId clientId, ClientType clientType, Endpoints accessPoint, Duration requestTimeout) { - this(clientId, clientType, accessPoint, null, requestTimeout); + public Settings(String namespace, ClientId clientId, ClientType clientType, Endpoints accessPoint, + Duration requestTimeout) { + this(namespace, clientId, clientType, accessPoint, null, requestTimeout); } public abstract apache.rocketmq.v2.Settings toProtobuf(); diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java index a807fd289..795c2caca 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java @@ -123,7 +123,10 @@ protected ListenableFuture receiveMessage(ReceiveMessageRe } private AckMessageRequest wrapAckMessageRequest(MessageViewImpl messageView) { - final Resource topicResource = Resource.newBuilder().setName(messageView.getTopic()).build(); + final Resource topicResource = Resource.newBuilder() + .setResourceNamespace(clientConfiguration.getNamespace()) + .setName(messageView.getTopic()) + .build(); final AckMessageEntry entry = AckMessageEntry.newBuilder() .setMessageId(messageView.getMessageId().toString()) .setReceiptHandle(messageView.getReceiptHandle()) @@ -134,7 +137,9 @@ private AckMessageRequest wrapAckMessageRequest(MessageViewImpl messageView) { private ChangeInvisibleDurationRequest wrapChangeInvisibleDuration(MessageViewImpl messageView, Duration invisibleDuration) { - final Resource topicResource = Resource.newBuilder().setName(messageView.getTopic()).build(); + final Resource topicResource = Resource.newBuilder() + .setResourceNamespace(clientConfiguration.getNamespace()) + .setName(messageView.getTopic()).build(); return ChangeInvisibleDurationRequest.newBuilder().setGroup(getProtobufGroup()).setTopic(topicResource) .setReceiptHandle(messageView.getReceiptHandle()) .setInvisibleDuration(Durations.fromNanos(invisibleDuration.toNanos())) @@ -219,7 +224,10 @@ public void onFailure(Throwable t) { } protected Resource getProtobufGroup() { - return Resource.newBuilder().setName(consumerGroup).build(); + return Resource.newBuilder() + .setResourceNamespace(clientConfiguration.getNamespace()) + .setName(consumerGroup) + .build(); } @Override diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java index 295367ace..2cbc6d021 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java @@ -127,9 +127,9 @@ public PushConsumerImpl(ClientConfiguration clientConfiguration, String consumer int maxCacheMessageCount, int maxCacheMessageSizeInBytes, int consumptionThreadCount) { super(clientConfiguration, consumerGroup, subscriptionExpressions.keySet()); this.clientConfiguration = clientConfiguration; - Resource groupResource = new Resource(consumerGroup); - this.pushSubscriptionSettings = new PushSubscriptionSettings(clientId, endpoints, groupResource, - clientConfiguration.getRequestTimeout(), subscriptionExpressions); + Resource groupResource = new Resource(clientConfiguration.getNamespace(), consumerGroup); + this.pushSubscriptionSettings = new PushSubscriptionSettings(clientConfiguration.getNamespace(), clientId, + endpoints, groupResource, clientConfiguration.getRequestTimeout(), subscriptionExpressions); this.consumerGroup = consumerGroup; this.subscriptionExpressions = subscriptionExpressions; this.cacheAssignments = new ConcurrentHashMap<>(); @@ -261,7 +261,10 @@ private ListenableFuture pickEndpointsToQueryAssignments(String topic } private QueryAssignmentRequest wrapQueryAssignmentRequest(String topic) { - apache.rocketmq.v2.Resource topicResource = apache.rocketmq.v2.Resource.newBuilder().setName(topic).build(); + apache.rocketmq.v2.Resource topicResource = apache.rocketmq.v2.Resource.newBuilder() + .setResourceNamespace(clientConfiguration.getNamespace()) + .setName(topic) + .build(); return QueryAssignmentRequest.newBuilder().setTopic(topicResource) .setEndpoints(endpoints.toProtobuf()).setGroup(getProtobufGroup()).build(); } @@ -500,7 +503,10 @@ public void onFailure(Throwable t) { private ForwardMessageToDeadLetterQueueRequest wrapForwardMessageToDeadLetterQueueRequest( MessageViewImpl messageView) { final apache.rocketmq.v2.Resource topicResource = - apache.rocketmq.v2.Resource.newBuilder().setName(messageView.getTopic()).build(); + apache.rocketmq.v2.Resource.newBuilder() + .setResourceNamespace(clientConfiguration.getNamespace()) + .setName(messageView.getTopic()) + .build(); return ForwardMessageToDeadLetterQueueRequest.newBuilder().setGroup(getProtobufGroup()).setTopic(topicResource) .setReceiptHandle(messageView.getReceiptHandle()) .setMessageId(messageView.getMessageId().toString()) diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettings.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettings.java index 70338b0cd..26a66a189 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettings.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettings.java @@ -50,9 +50,9 @@ public class PushSubscriptionSettings extends Settings { private volatile int receiveBatchSize = 32; private volatile Duration longPollingTimeout = Duration.ofSeconds(30); - public PushSubscriptionSettings(ClientId clientId, Endpoints endpoints, Resource group, + public PushSubscriptionSettings(String namespace, ClientId clientId, Endpoints endpoints, Resource group, Duration requestTimeout, Map subscriptionExpression) { - super(clientId, ClientType.PUSH_CONSUMER, endpoints, requestTimeout); + super(namespace, clientId, ClientType.PUSH_CONSUMER, endpoints, requestTimeout); this.group = group; this.subscriptionExpressions = subscriptionExpression; } @@ -75,7 +75,10 @@ public apache.rocketmq.v2.Settings toProtobuf() { for (Map.Entry entry : subscriptionExpressions.entrySet()) { final FilterExpression filterExpression = entry.getValue(); apache.rocketmq.v2.Resource topic = - apache.rocketmq.v2.Resource.newBuilder().setName(entry.getKey()).build(); + apache.rocketmq.v2.Resource.newBuilder() + .setResourceNamespace(namespace) + .setName(entry.getKey()) + .build(); final apache.rocketmq.v2.FilterExpression.Builder expressionBuilder = apache.rocketmq.v2.FilterExpression.newBuilder().setExpression(filterExpression.getExpression()); final FilterExpressionType type = filterExpression.getFilterExpressionType(); diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java index 5d6092a8a..e73774e59 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java @@ -74,9 +74,9 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer { public SimpleConsumerImpl(ClientConfiguration clientConfiguration, String consumerGroup, Duration awaitDuration, Map subscriptionExpressions) { super(clientConfiguration, consumerGroup, subscriptionExpressions.keySet()); - Resource groupResource = new Resource(consumerGroup); - this.simpleSubscriptionSettings = new SimpleSubscriptionSettings(clientId, endpoints, - groupResource, clientConfiguration.getRequestTimeout(), awaitDuration, subscriptionExpressions); + Resource groupResource = new Resource(clientConfiguration.getNamespace(), consumerGroup); + this.simpleSubscriptionSettings = new SimpleSubscriptionSettings(clientConfiguration.getNamespace(), clientId, + endpoints, groupResource, clientConfiguration.getRequestTimeout(), awaitDuration, subscriptionExpressions); this.consumerGroup = consumerGroup; this.awaitDuration = awaitDuration; diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettings.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettings.java index 0ee02edb6..471937633 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettings.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettings.java @@ -45,9 +45,9 @@ public class SimpleSubscriptionSettings extends Settings { private final Duration longPollingTimeout; private final Map subscriptionExpressions; - public SimpleSubscriptionSettings(ClientId clientId, Endpoints endpoints, Resource group, + public SimpleSubscriptionSettings(String namespace, ClientId clientId, Endpoints endpoints, Resource group, Duration requestTimeout, Duration longPollingTimeout, Map subscriptionExpression) { - super(clientId, ClientType.SIMPLE_CONSUMER, endpoints, requestTimeout); + super(namespace, clientId, ClientType.SIMPLE_CONSUMER, endpoints, requestTimeout); this.group = group; this.subscriptionExpressions = subscriptionExpression; this.longPollingTimeout = longPollingTimeout; @@ -59,7 +59,9 @@ public apache.rocketmq.v2.Settings toProtobuf() { for (Map.Entry entry : subscriptionExpressions.entrySet()) { final FilterExpression filterExpression = entry.getValue(); apache.rocketmq.v2.Resource topic = apache.rocketmq.v2.Resource.newBuilder() - .setName(entry.getKey()).build(); + .setResourceNamespace(namespace) + .setName(entry.getKey()) + .build(); final apache.rocketmq.v2.FilterExpression.Builder expressionBuilder = apache.rocketmq.v2.FilterExpression.newBuilder().setExpression(filterExpression.getExpression()); final FilterExpressionType type = filterExpression.getFilterExpressionType(); diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java index 1db6e1793..450a68d58 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java @@ -101,8 +101,8 @@ class ProducerImpl extends ClientImpl implements Producer { TransactionChecker checker) { super(clientConfiguration, topics); ExponentialBackoffRetryPolicy retryPolicy = ExponentialBackoffRetryPolicy.immediatelyRetryPolicy(maxAttempts); - this.publishingSettings = new PublishingSettings(clientId, endpoints, retryPolicy, - clientConfiguration.getRequestTimeout(), topics); + this.publishingSettings = new PublishingSettings(clientConfiguration.getNamespace(), clientId, endpoints, + retryPolicy, clientConfiguration.getRequestTimeout(), topics); this.checker = checker; this.publishingRouteDataCache = new ConcurrentHashMap<>(); } @@ -259,7 +259,10 @@ public void endTransaction(Endpoints endpoints, GeneralMessage generalMessage, M String transactionId, final TransactionResolution resolution) throws ClientException { final EndTransactionRequest.Builder builder = EndTransactionRequest.newBuilder() .setMessageId(messageId.toString()).setTransactionId(transactionId) - .setTopic(apache.rocketmq.v2.Resource.newBuilder().setName(generalMessage.getTopic()).build()); + .setTopic(apache.rocketmq.v2.Resource.newBuilder() + .setResourceNamespace(clientConfiguration.getNamespace()) + .setName(generalMessage.getTopic()) + .build()); switch (resolution) { case COMMIT: builder.setResolution(apache.rocketmq.v2.TransactionResolution.COMMIT); @@ -415,7 +418,8 @@ private ListenableFuture> send(List messages, boo */ private SendMessageRequest wrapSendMessageRequest(List pubMessages, MessageQueueImpl mq) { final List messages = pubMessages.stream() - .map(publishingMessage -> publishingMessage.toProtobuf(mq)).collect(Collectors.toList()); + .map(publishingMessage -> publishingMessage.toProtobuf(clientConfiguration.getNamespace(), mq)) + .collect(Collectors.toList()); return SendMessageRequest.newBuilder().addAllMessages(messages).build(); } diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingSettings.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingSettings.java index f1605c33a..29159caa0 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingSettings.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingSettings.java @@ -45,9 +45,9 @@ public class PublishingSettings extends Settings { private volatile int maxBodySizeBytes = 4 * 1024 * 1024; private volatile boolean validateMessageType = true; - public PublishingSettings(ClientId clientId, Endpoints accessPoint, ExponentialBackoffRetryPolicy retryPolicy, - Duration requestTimeout, Set topics) { - super(clientId, ClientType.PRODUCER, accessPoint, retryPolicy, requestTimeout); + public PublishingSettings(String namespace, ClientId clientId, Endpoints accessPoint, + ExponentialBackoffRetryPolicy retryPolicy, Duration requestTimeout, Set topics) { + super(namespace, clientId, ClientType.PRODUCER, accessPoint, retryPolicy, requestTimeout); this.topics = topics; } @@ -62,8 +62,13 @@ public boolean isValidateMessageType() { @Override public apache.rocketmq.v2.Settings toProtobuf() { final Publishing publishing = Publishing.newBuilder() - .addAllTopics(topics.stream().map(name -> Resource.newBuilder().setName(name).build()) - .collect(Collectors.toList())).setValidateMessageType(validateMessageType).build(); + .addAllTopics(topics.stream().map(name -> Resource.newBuilder() + .setResourceNamespace(namespace) + .setName(name) + .build()) + .collect(Collectors.toList())) + .setValidateMessageType(validateMessageType) + .build(); final apache.rocketmq.v2.Settings.Builder builder = apache.rocketmq.v2.Settings.newBuilder() .setAccessPoint(accessPoint.toProtobuf()).setClientType(clientType.toProtobuf()) .setRequestTimeout(Durations.fromNanos(requestTimeout.toNanos())).setPublishing(publishing); diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java index 0795c82ea..6af6d7370 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java @@ -87,7 +87,7 @@ public MessageType getMessageType() { *

This method should be invoked before each message sending, because the born time is reset before each * invocation, which means that it should not be invoked ahead of time. */ - public apache.rocketmq.v2.Message toProtobuf(MessageQueueImpl mq) { + public apache.rocketmq.v2.Message toProtobuf(String namespace, MessageQueueImpl mq) { final apache.rocketmq.v2.SystemProperties.Builder systemPropertiesBuilder = apache.rocketmq.v2.SystemProperties.newBuilder() // Message keys @@ -112,7 +112,7 @@ public apache.rocketmq.v2.Message toProtobuf(MessageQueueImpl mq) { // Message group this.getMessageGroup().ifPresent(systemPropertiesBuilder::setMessageGroup); final SystemProperties systemProperties = systemPropertiesBuilder.build(); - Resource topicResource = Resource.newBuilder().setName(getTopic()).build(); + Resource topicResource = Resource.newBuilder().setResourceNamespace(namespace).setName(getTopic()).build(); return apache.rocketmq.v2.Message.newBuilder() // Topic .setTopic(topicResource) diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettingsTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettingsTest.java index 20f31f8b0..a771fc25f 100644 --- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettingsTest.java +++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettingsTest.java @@ -41,12 +41,12 @@ public class PushSubscriptionSettingsTest extends TestBase { @Test public void testToProtobuf() { - Resource groupResource = new Resource(FAKE_CONSUMER_GROUP_0); + Resource groupResource = new Resource(FAKE_NAMESPACE, FAKE_CONSUMER_GROUP_0); ClientId clientId = new ClientId(); Map subscriptionExpression = new HashMap<>(); subscriptionExpression.put(FAKE_TOPIC_0, new FilterExpression()); final Duration requestTimeout = Duration.ofSeconds(3); - final PushSubscriptionSettings pushSubscriptionSettings = new PushSubscriptionSettings(clientId, + final PushSubscriptionSettings pushSubscriptionSettings = new PushSubscriptionSettings(FAKE_NAMESPACE, clientId, fakeEndpoints(), groupResource, requestTimeout, subscriptionExpression); final Settings settings = pushSubscriptionSettings.toProtobuf(); Assert.assertEquals(settings.getClientType(), ClientType.PUSH_CONSUMER); @@ -54,26 +54,32 @@ public void testToProtobuf() { Assert.assertTrue(settings.hasSubscription()); final Subscription subscription = settings.getSubscription(); Assert.assertEquals(subscription.getGroup(), - apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_CONSUMER_GROUP_0).build()); + apache.rocketmq.v2.Resource.newBuilder() + .setResourceNamespace(FAKE_NAMESPACE) + .setName(FAKE_CONSUMER_GROUP_0) + .build()); Assert.assertFalse(subscription.getFifo()); final List subscriptionsList = subscription.getSubscriptionsList(); Assert.assertEquals(subscriptionsList.size(), 1); final SubscriptionEntry subscriptionEntry = subscriptionsList.get(0); Assert.assertEquals(subscriptionEntry.getExpression().getType(), FilterType.TAG); Assert.assertEquals(subscriptionEntry.getTopic(), - apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_TOPIC_0).build()); + apache.rocketmq.v2.Resource.newBuilder() + .setResourceNamespace(FAKE_NAMESPACE) + .setName(FAKE_TOPIC_0) + .build()); } @Test public void testToProtobufWithSqlExpression() { - Resource groupResource = new Resource(FAKE_CONSUMER_GROUP_0); + Resource groupResource = new Resource(FAKE_NAMESPACE, FAKE_CONSUMER_GROUP_0); ClientId clientId = new ClientId(); Map subscriptionExpression = new HashMap<>(); subscriptionExpression.put(FAKE_TOPIC_0, new FilterExpression("(a > 10 AND a < 100) OR (b IS NOT NULL AND " + "b=TRUE)", FilterExpressionType.SQL92)); final Duration requestTimeout = Duration.ofSeconds(3); - final PushSubscriptionSettings pushSubscriptionSettings = new PushSubscriptionSettings(clientId, + final PushSubscriptionSettings pushSubscriptionSettings = new PushSubscriptionSettings(FAKE_NAMESPACE, clientId, fakeEndpoints(), groupResource, requestTimeout, subscriptionExpression); final Settings settings = pushSubscriptionSettings.toProtobuf(); Assert.assertEquals(settings.getClientType(), ClientType.PUSH_CONSUMER); @@ -81,14 +87,20 @@ public void testToProtobufWithSqlExpression() { Assert.assertTrue(settings.hasSubscription()); final Subscription subscription = settings.getSubscription(); Assert.assertEquals(subscription.getGroup(), - apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_CONSUMER_GROUP_0).build()); + apache.rocketmq.v2.Resource.newBuilder() + .setResourceNamespace(FAKE_NAMESPACE) + .setName(FAKE_CONSUMER_GROUP_0) + .build()); Assert.assertFalse(subscription.getFifo()); final List subscriptionsList = subscription.getSubscriptionsList(); Assert.assertEquals(subscriptionsList.size(), 1); final SubscriptionEntry subscriptionEntry = subscriptionsList.get(0); Assert.assertEquals(subscriptionEntry.getExpression().getType(), FilterType.SQL); Assert.assertEquals(subscriptionEntry.getTopic(), - apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_TOPIC_0).build()); + apache.rocketmq.v2.Resource.newBuilder() + .setResourceNamespace(FAKE_NAMESPACE) + .setName(FAKE_TOPIC_0) + .build()); } @Test @@ -115,7 +127,7 @@ public void testSync() { subscriptionExpression.put(FAKE_TOPIC_0, new FilterExpression("(a > 10 AND a < 100) OR (b IS NOT NULL AND " + "b=TRUE)", FilterExpressionType.SQL92)); final Duration requestTimeout = Duration.ofSeconds(3); - final PushSubscriptionSettings pushSubscriptionSettings = new PushSubscriptionSettings(clientId, + final PushSubscriptionSettings pushSubscriptionSettings = new PushSubscriptionSettings(FAKE_NAMESPACE, clientId, fakeEndpoints(), groupResource, requestTimeout, subscriptionExpression); pushSubscriptionSettings.sync(settings); } diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettingsTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettingsTest.java index 06cf6e9b6..d3ea28778 100644 --- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettingsTest.java +++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettingsTest.java @@ -39,21 +39,24 @@ public class SimpleSubscriptionSettingsTest extends TestBase { @Test public void testToProtobuf() { - Resource groupResource = new Resource(FAKE_CONSUMER_GROUP_0); + Resource groupResource = new Resource(FAKE_NAMESPACE, FAKE_CONSUMER_GROUP_0); ClientId clientId = new ClientId(); Map subscriptionExpression = new HashMap<>(); subscriptionExpression.put(FAKE_TOPIC_0, new FilterExpression()); final Duration requestTimeout = Duration.ofSeconds(3); final Duration longPollingTimeout = Duration.ofSeconds(15); - final SimpleSubscriptionSettings simpleSubscriptionSettings = new SimpleSubscriptionSettings(clientId, - fakeEndpoints(), groupResource, requestTimeout, longPollingTimeout, subscriptionExpression); + final SimpleSubscriptionSettings simpleSubscriptionSettings = new SimpleSubscriptionSettings(FAKE_NAMESPACE, + clientId, fakeEndpoints(), groupResource, requestTimeout, longPollingTimeout, subscriptionExpression); final Settings settings = simpleSubscriptionSettings.toProtobuf(); Assert.assertEquals(settings.getClientType(), ClientType.SIMPLE_CONSUMER); Assert.assertEquals(settings.getRequestTimeout(), Durations.fromNanos(requestTimeout.toNanos())); Assert.assertTrue(settings.hasSubscription()); final Subscription subscription = settings.getSubscription(); Assert.assertEquals(subscription.getGroup(), - apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_CONSUMER_GROUP_0).build()); + apache.rocketmq.v2.Resource.newBuilder() + .setResourceNamespace(FAKE_NAMESPACE) + .setName(FAKE_CONSUMER_GROUP_0) + .build()); Assert.assertFalse(subscription.getFifo()); Assert.assertEquals(subscription.getLongPollingTimeout(), Durations.fromNanos(longPollingTimeout.toNanos())); final List subscriptionsList = subscription.getSubscriptionsList(); @@ -61,27 +64,33 @@ public void testToProtobuf() { final SubscriptionEntry subscriptionEntry = subscriptionsList.get(0); Assert.assertEquals(subscriptionEntry.getExpression().getType(), FilterType.TAG); Assert.assertEquals(subscriptionEntry.getTopic(), - apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_TOPIC_0).build()); + apache.rocketmq.v2.Resource.newBuilder() + .setResourceNamespace(FAKE_NAMESPACE) + .setName(FAKE_TOPIC_0) + .build()); } @Test public void testToProtobufWithSqlExpression() { - Resource groupResource = new Resource(FAKE_CONSUMER_GROUP_0); + Resource groupResource = new Resource(FAKE_NAMESPACE, FAKE_CONSUMER_GROUP_0); ClientId clientId = new ClientId(); Map subscriptionExpression = new HashMap<>(); subscriptionExpression.put(FAKE_TOPIC_0, new FilterExpression("(a > 10 AND a < 100) OR (b IS NOT NULL AND " + "b=TRUE)", FilterExpressionType.SQL92)); final Duration requestTimeout = Duration.ofSeconds(3); final Duration longPollingTimeout = Duration.ofSeconds(15); - final SimpleSubscriptionSettings simpleSubscriptionSettings = new SimpleSubscriptionSettings(clientId, - fakeEndpoints(), groupResource, requestTimeout, longPollingTimeout, subscriptionExpression); + final SimpleSubscriptionSettings simpleSubscriptionSettings = new SimpleSubscriptionSettings(FAKE_NAMESPACE, + clientId, fakeEndpoints(), groupResource, requestTimeout, longPollingTimeout, subscriptionExpression); final Settings settings = simpleSubscriptionSettings.toProtobuf(); Assert.assertEquals(settings.getClientType(), ClientType.SIMPLE_CONSUMER); Assert.assertEquals(settings.getRequestTimeout(), Durations.fromNanos(requestTimeout.toNanos())); Assert.assertTrue(settings.hasSubscription()); final Subscription subscription = settings.getSubscription(); Assert.assertEquals(subscription.getGroup(), - apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_CONSUMER_GROUP_0).build()); + apache.rocketmq.v2.Resource.newBuilder() + .setResourceNamespace(FAKE_NAMESPACE) + .setName(FAKE_CONSUMER_GROUP_0) + .build()); Assert.assertFalse(subscription.getFifo()); Assert.assertEquals(subscription.getLongPollingTimeout(), Durations.fromNanos(longPollingTimeout.toNanos())); final List subscriptionsList = subscription.getSubscriptionsList(); @@ -89,7 +98,10 @@ public void testToProtobufWithSqlExpression() { final SubscriptionEntry subscriptionEntry = subscriptionsList.get(0); Assert.assertEquals(subscriptionEntry.getExpression().getType(), FilterType.SQL); Assert.assertEquals(subscriptionEntry.getTopic(), - apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_TOPIC_0).build()); + apache.rocketmq.v2.Resource.newBuilder() + .setResourceNamespace(FAKE_NAMESPACE) + .setName(FAKE_TOPIC_0) + .build()); } } \ No newline at end of file diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java index ef6723c99..8b74d04b2 100644 --- a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java +++ b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java @@ -86,6 +86,7 @@ import org.mockito.Mockito; public class TestBase { + protected static final String FAKE_NAMESPACE = "foo-bar-namespace"; protected static final ClientId FAKE_CLIENT_ID = new ClientId(); protected static final String FAKE_TOPIC_0 = "foo-bar-topic-0"; @@ -191,6 +192,14 @@ protected MessageQueueImpl fakeMessageQueueImpl(String topic) { return new MessageQueueImpl(fakePbMessageQueue0(Resource.newBuilder().setName(topic).build())); } + protected MessageQueueImpl fakeMessageQueueImpl(String namespace, String topic) { + return new MessageQueueImpl(fakePbMessageQueue0( + Resource.newBuilder() + .setResourceNamespace(namespace) + .setName(topic) + .build())); + } + protected MessageQueueImpl fakeMessageQueueImpl0() { return new MessageQueueImpl(fakePbMessageQueue0()); } @@ -379,8 +388,8 @@ protected ExponentialBackoffRetryPolicy fakeExponentialBackoffRetryPolicy() { } protected PublishingSettings fakeProducerSettings() { - return new PublishingSettings(FAKE_CLIENT_ID, fakeEndpoints(), fakeExponentialBackoffRetryPolicy(), - Duration.ofSeconds(1), new HashSet<>()); + return new PublishingSettings(FAKE_NAMESPACE, FAKE_CLIENT_ID, fakeEndpoints(), + fakeExponentialBackoffRetryPolicy(), Duration.ofSeconds(1), new HashSet<>()); } protected SendReceiptImpl fakeSendReceiptImpl(