diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java index 91d7ba3b6..b3981b602 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java @@ -359,6 +359,15 @@ private void releaseClientSessions() { } } + public void removeClientSession(Endpoints endpoints, ClientSessionImpl clientSession) { + sessionsLock.writeLock().lock(); + try { + sessionsTable.remove(endpoints, clientSession); + } finally { + sessionsLock.writeLock().unlock(); + } + } + private ClientSessionImpl getClientSession(Endpoints endpoints) throws ClientException { sessionsLock.readLock().lock(); try { diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java index 2d6915128..663b62edc 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java @@ -65,6 +65,7 @@ private void renewRequestObserver() { if (sessionHandler.isEndpointsDeprecated(endpoints)) { log.info("Endpoints is deprecated, no longer to renew requestObserver, endpoints={}, clientId={}", endpoints, clientId); + sessionHandler.removeClientSession(endpoints, this); return; } log.info("Try to renew requestObserver, endpoints={}, clientId={}", endpoints, clientId); diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionHandler.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionHandler.java index 4464b4afd..9e68692a3 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionHandler.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionHandler.java @@ -25,6 +25,7 @@ import io.grpc.stub.StreamObserver; import java.util.concurrent.ScheduledExecutorService; import org.apache.rocketmq.client.apis.ClientException; +import org.apache.rocketmq.client.java.impl.ClientSessionImpl; import org.apache.rocketmq.client.java.misc.ClientId; import org.apache.rocketmq.client.java.route.Endpoints; @@ -47,6 +48,11 @@ public interface ClientSessionHandler { */ boolean isEndpointsDeprecated(Endpoints endpoints); + /** + * Remove client session. + */ + void removeClientSession(Endpoints endpoints, ClientSessionImpl clientSession); + /** * Indicates the client identifier. *