Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

restrict max concurrent outbound dials #6860

Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions crates/net/network/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,11 @@ where
.peers_mut()
.on_incoming_session_established(peer_id, remote_addr);
}

if direction.is_outgoing() {
self.swarm.state_mut().peers_mut().on_active_outgoing_established(peer_id);
}

self.event_listeners.notify(NetworkEvent::SessionEstablished {
peer_id,
remote_addr,
Expand Down
97 changes: 80 additions & 17 deletions crates/net/network/src/peers/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,15 @@ impl PeersManager {
self.fill_outbound_slots();
}

/// Called when a _pending_ outbound connection is successful.
pub(crate) fn on_active_outgoing_established(&mut self, peer_id: PeerId) {
if let Some(peer) = self.peers.get_mut(&peer_id) {
self.connection_info.decr_state(peer.state);
self.connection_info.inc_out();
peer.state = PeerConnectionState::Out;
abhijeetbhagat marked this conversation as resolved.
Show resolved Hide resolved
}
}

/// Called when an _active_ session to a peer was forcefully dropped due to an error.
///
/// Depending on whether the error is fatal, the peer will be removed from the peer set
Expand Down Expand Up @@ -698,7 +707,6 @@ impl PeersManager {
}

// as long as there a slots available fill them with the best peers
let mut new_outbound_dials = 1;
while self.connection_info.has_out_capacity() {
let action = {
let (peer_id, peer) = match self.best_unconnected() {
Expand All @@ -713,18 +721,13 @@ impl PeersManager {

trace!(target: "net::peers", ?peer_id, addr=?peer.addr, "schedule outbound connection");

peer.state = PeerConnectionState::Out;
peer.state = PeerConnectionState::PendingOut;
PeerAction::Connect { peer_id, remote_addr: peer.addr }
};

self.connection_info.inc_out();
self.connection_info.inc_pendingout();

self.queued_actions.push_back(action);

new_outbound_dials += 1;
if new_outbound_dials > self.connection_info.max_concurrent_outbound_dials {
break
}
}
}

Expand Down Expand Up @@ -822,6 +825,9 @@ pub struct ConnectionInfo {
/// Counter for currently occupied slots for active outbound connections.
#[cfg_attr(feature = "serde", serde(skip))]
num_outbound: usize,
/// Counter for pending outbound connections.
#[cfg_attr(feature = "serde", serde(skip))]
num_pendingout: usize,
/// Counter for currently occupied slots for active inbound connections.
#[cfg_attr(feature = "serde", serde(skip))]
num_inbound: usize,
Expand All @@ -839,7 +845,8 @@ pub struct ConnectionInfo {
impl ConnectionInfo {
/// Returns `true` if there's still capacity for a new outgoing connection.
fn has_out_capacity(&self) -> bool {
self.num_outbound < self.max_outbound
self.num_pendingout < self.max_concurrent_outbound_dials &&
self.num_outbound < self.max_outbound
}

/// Returns `true` if there's still capacity for a new incoming connection.
Expand All @@ -852,6 +859,7 @@ impl ConnectionInfo {
PeerConnectionState::Idle => {}
PeerConnectionState::DisconnectingIn | PeerConnectionState::In => self.decr_in(),
PeerConnectionState::DisconnectingOut | PeerConnectionState::Out => self.decr_out(),
PeerConnectionState::PendingOut => self.decr_pendingout(),
}
}

Expand All @@ -863,13 +871,21 @@ impl ConnectionInfo {
self.num_outbound += 1;
}

fn inc_pendingout(&mut self) {
self.num_pendingout += 1;
}

fn inc_in(&mut self) {
self.num_inbound += 1;
}

fn decr_in(&mut self) {
self.num_inbound -= 1;
}

fn decr_pendingout(&mut self) {
self.num_pendingout -= 1;
}
}

impl Default for ConnectionInfo {
Expand All @@ -880,6 +896,7 @@ impl Default for ConnectionInfo {
max_outbound: DEFAULT_MAX_COUNT_PEERS_OUTBOUND as usize,
max_inbound: DEFAULT_MAX_COUNT_PEERS_INBOUND as usize,
max_concurrent_outbound_dials: DEFAULT_MAX_COUNT_CONCURRENT_DIALS,
num_pendingout: 0,
}
}
}
Expand Down Expand Up @@ -1020,6 +1037,8 @@ enum PeerConnectionState {
In,
/// Connected via outgoing connection.
Out,
/// Pending outgoing connection.
PendingOut,
}

// === impl PeerConnectionState ===
Expand All @@ -1044,7 +1063,10 @@ impl PeerConnectionState {
/// Returns whether we're currently connected with this peer
#[inline]
fn is_connected(&self) -> bool {
matches!(self, PeerConnectionState::In | PeerConnectionState::Out)
matches!(
self,
PeerConnectionState::In | PeerConnectionState::Out | PeerConnectionState::PendingOut
)
}

/// Returns if there's currently no connection to that peer.
Expand Down Expand Up @@ -1907,12 +1929,12 @@ mod tests {
}

let p = peers.peers.get_mut(&peer).unwrap();
assert_eq!(p.state, PeerConnectionState::Out);
assert_eq!(p.state, PeerConnectionState::PendingOut);

peers.apply_reputation_change(&peer, ReputationChangeKind::BadProtocol);

let p = peers.peers.get(&peer).unwrap();
assert_eq!(p.state, PeerConnectionState::DisconnectingOut);
assert_eq!(p.state, PeerConnectionState::PendingOut);
assert!(p.is_banned());

peers.on_active_session_gracefully_closed(peer);
Expand Down Expand Up @@ -1966,7 +1988,7 @@ mod tests {
}

let p = peers.peers.get(&peer).unwrap();
assert_eq!(p.state, PeerConnectionState::Out);
assert_eq!(p.state, PeerConnectionState::PendingOut);

peers.remove_peer(peer);

Expand All @@ -1984,11 +2006,11 @@ mod tests {
}

let p = peers.peers.get(&peer).unwrap();
assert_eq!(p.state, PeerConnectionState::DisconnectingOut);
assert_eq!(p.state, PeerConnectionState::PendingOut);

peers.add_peer(peer, socket_addr, None);
let p = peers.peers.get(&peer).unwrap();
assert_eq!(p.state, PeerConnectionState::DisconnectingOut);
assert_eq!(p.state, PeerConnectionState::PendingOut);

peers.on_active_session_gracefully_closed(peer);
assert!(!peers.peers.contains_key(&peer));
Expand Down Expand Up @@ -2016,9 +2038,9 @@ mod tests {
}

let p = peers.peers.get(&peer).unwrap();
assert_eq!(p.state, PeerConnectionState::Out);
assert_eq!(p.state, PeerConnectionState::PendingOut);

assert_eq!(peers.num_outbound_connections(), 1);
assert_eq!(peers.num_outbound_connections(), 0);

peers.on_outgoing_connection_failure(
&socket_addr,
Expand Down Expand Up @@ -2283,4 +2305,45 @@ mod tests {
.count();
assert_eq!(dials, peer_manager.connection_info.max_concurrent_outbound_dials);
}

#[tokio::test]
async fn test_max_num_of_pending_dials() {
let config = PeersConfig::default();
let mut peer_manager = PeersManager::new(config);
let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
let socket_addr = SocketAddr::new(ip, 8008);

// add more peers than allowed
for _ in 0..peer_manager.connection_info.max_concurrent_outbound_dials * 2 {
peer_manager.add_peer(PeerId::random(), socket_addr, None);
}

// generate 'Connect' actions
peer_manager.fill_outbound_slots();

// all dialed connections should be in 'PendingOut' state
let dials = peer_manager.connection_info.num_pendingout;
assert_eq!(dials, peer_manager.connection_info.max_concurrent_outbound_dials);

let num_pendingout_states = peer_manager
.peers
.iter()
.filter(|(_, peer)| peer.state == PeerConnectionState::PendingOut)
.map(|(peer_id, _)| *peer_id)
.collect::<Vec<PeerId>>();
assert_eq!(
num_pendingout_states.len(),
peer_manager.connection_info.max_concurrent_outbound_dials
);

// establish dialed connections
for peer_id in num_pendingout_states.iter() {
peer_manager.on_active_outgoing_established(*peer_id);
}

// all dialed connections should now be in 'Out' state
for peer_id in num_pendingout_states.iter() {
assert_eq!(peer_manager.peers.get(peer_id).unwrap().state, PeerConnectionState::Out);
}
}
abhijeetbhagat marked this conversation as resolved.
Show resolved Hide resolved
}
Loading