Skip to content

Commit

Permalink
fix(rust): fix session life cycle (#646)
Browse files Browse the repository at this point in the history
* fix(rust): fix session life cycle

Signed-off-by: SSpirits <[email protected]>

* fix(rust): fix session life cycle

Signed-off-by: SSpirits <[email protected]>

---------

Signed-off-by: SSpirits <[email protected]>
  • Loading branch information
ShadowySpirits committed Dec 6, 2023
1 parent c38f7d5 commit 2d3cdf7
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 8 deletions.
4 changes: 2 additions & 2 deletions rust/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,14 +190,14 @@ impl Client {
},
command = telemetry_command_rx.recv() => {
if let Some(command) = command {
let result = Self::handle_telemetry_command(rpc_client.clone(), &transaction_checker, endpoints.clone(), command).await;
let result = Self::handle_telemetry_command(rpc_client.shadow_session(), &transaction_checker, endpoints.clone(), command).await;
if let Err(error) = result {
error!(logger, "handle telemetry command failed: {:?}", error);
}
}
},
_ = &mut shutdown_rx => {
debug!(logger, "receive shutdown signal, stop heartbeat task and telemetry command handler");
info!(logger, "receive shutdown signal, stop heartbeat task and telemetry command handler");
break;
}
}
Expand Down
13 changes: 7 additions & 6 deletions rust/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ pub(crate) struct Session {
shutdown_tx: Option<oneshot::Sender<()>>,
}

impl Clone for Session {
fn clone(&self) -> Self {
impl Session {
pub(crate) fn shadow_session(&self) -> Self {
Session {
logger: self.logger.clone(),
client_id: self.client_id.clone(),
Expand Down Expand Up @@ -580,7 +580,7 @@ impl SessionManager {
let mut session_map = self.session_map.lock().await;
let endpoint_url = endpoints.endpoint_url().to_string();
return if session_map.contains_key(&endpoint_url) {
Ok(session_map.get(&endpoint_url).unwrap().clone())
Ok(session_map.get(&endpoint_url).unwrap().shadow_session())
} else {
let mut session = Session::new(
&self.logger,
Expand All @@ -590,16 +590,17 @@ impl SessionManager {
)
.await?;
session.start(settings, telemetry_command_tx).await?;
session_map.insert(endpoint_url.clone(), session.clone());
Ok(session)
let shadow_session = session.shadow_session();
session_map.insert(endpoint_url.clone(), session);
Ok(shadow_session)
};
}

pub(crate) async fn get_all_sessions(&self) -> Result<Vec<Session>, ClientError> {
let session_map = self.session_map.lock().await;
let mut sessions = Vec::new();
for (_, session) in session_map.iter() {
sessions.push(session.clone());
sessions.push(session.shadow_session());
}
Ok(sessions)
}
Expand Down

0 comments on commit 2d3cdf7

Please sign in to comment.