Skip to content

Commit

Permalink
Remove redundant clientSession
Browse files Browse the repository at this point in the history
  • Loading branch information
aaron-ai committed Dec 2, 2022
1 parent 5d9e8c5 commit 8aa0ee1
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,15 @@ private void releaseClientSessions() {
}
}

public void removeClientSession(Endpoints endpoints, ClientSessionImpl clientSession) {
sessionsLock.writeLock().lock();
try {
sessionsTable.remove(endpoints, clientSession);
} finally {
sessionsLock.writeLock().unlock();
}
}

private ClientSessionImpl getClientSession(Endpoints endpoints) throws ClientException {
sessionsLock.readLock().lock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ private void renewRequestObserver() {
if (sessionHandler.isEndpointsDeprecated(endpoints)) {
log.info("Endpoints is deprecated, no longer to renew requestObserver, endpoints={}, clientId={}",
endpoints, clientId);
sessionHandler.removeClientSession(endpoints, this);
return;
}
log.info("Try to renew requestObserver, endpoints={}, clientId={}", endpoints, clientId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.grpc.stub.StreamObserver;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.java.impl.ClientSessionImpl;
import org.apache.rocketmq.client.java.misc.ClientId;
import org.apache.rocketmq.client.java.route.Endpoints;

Expand All @@ -47,6 +48,11 @@ public interface ClientSessionHandler {
*/
boolean isEndpointsDeprecated(Endpoints endpoints);

/**
* Remove client session.
*/
void removeClientSession(Endpoints endpoints, ClientSessionImpl clientSession);

/**
* Indicates the client identifier.
*
Expand Down

0 comments on commit 8aa0ee1

Please sign in to comment.