Skip to content

Commit

Permalink
balancer: HashRing: repeat nodes on ring to balance better (#1758)
Browse files Browse the repository at this point in the history
  • Loading branch information
dyc3 committed Apr 29, 2024
1 parent fdb829a commit aa795a2
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 10 deletions.
12 changes: 9 additions & 3 deletions crates/ott-balancer/benches/selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ fn select_monoliths(c: &mut Criterion) {
});

c.bench_function("hash ring selector, 2 monoliths", |b| {
let strategy = HashRingSelector;
let strategy = HashRingSelector {
config: Default::default(),
};
let mut m1 = build_monolith();
m1.add_room(&"foo1".into()).expect("failed to add room");
m1.add_room(&"foo2".into()).expect("failed to add room");
Expand All @@ -112,7 +114,9 @@ fn select_monoliths(c: &mut Criterion) {
});

c.bench_function("hash ring selector, 5 monoliths", |b| {
let strategy = HashRingSelector;
let strategy = HashRingSelector {
config: Default::default(),
};
let mut m1 = build_monolith();
m1.add_room(&"foo1".into()).expect("failed to add room");
m1.add_room(&"foo2".into()).expect("failed to add room");
Expand All @@ -132,7 +136,9 @@ fn select_monoliths(c: &mut Criterion) {
});

c.bench_function("hash ring selector, 10 monoliths", |b| {
let strategy = HashRingSelector;
let strategy = HashRingSelector {
config: Default::default(),
};
let mut m1 = build_monolith();
m1.add_room(&"foo1".into()).expect("failed to add room");
m1.add_room(&"foo2".into()).expect("failed to add room");
Expand Down
4 changes: 3 additions & 1 deletion crates/ott-balancer/src/balancer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1177,7 +1177,9 @@ mod test {
BalancerConfig::init_default();
let ctx = Arc::new(RwLock::new(BalancerContext::new()));
ctx.write().await.monolith_selection =
MonolithSelectionStrategy::HashRing(HashRingSelector);
MonolithSelectionStrategy::HashRing(HashRingSelector {
config: Default::default(),
});
let (monolith_outbound_tx_1, _monolith_outbound_rx_1) = tokio::sync::mpsc::channel(100);
let monolith_outbound_tx_1 = Arc::new(monolith_outbound_tx_1);
let (client_inbound_tx_1, _client_inbound_rx_1) = tokio::sync::mpsc::channel(100);
Expand Down
45 changes: 39 additions & 6 deletions crates/ott-balancer/src/selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl Default for MonolithSelectionStrategy {
pub enum MonolithSelectionConfig {
#[default]
MinRooms,
HashRing,
HashRing(HashRingSelectorConfig),
Random,
}

Expand All @@ -56,8 +56,8 @@ impl From<MonolithSelectionConfig> for MonolithSelectionStrategy {
MonolithSelectionConfig::MinRooms => {
MonolithSelectionStrategy::MinRooms(MinRoomsSelector)
}
MonolithSelectionConfig::HashRing => {
MonolithSelectionStrategy::HashRing(HashRingSelector)
MonolithSelectionConfig::HashRing(config) => {
MonolithSelectionStrategy::HashRing(config.into())
}
MonolithSelectionConfig::Random => MonolithSelectionStrategy::Random(RandomSelector),
}
Expand Down Expand Up @@ -85,17 +85,48 @@ impl MonolithSelection for MinRoomsSelector {
}
}

#[derive(Debug, Copy, Clone, PartialEq, Eq, Deserialize)]
pub struct HashRingSelectorConfig {
#[serde(default)]
pub weight: usize,
}

impl Default for HashRingSelectorConfig {
fn default() -> Self {
HashRingSelectorConfig { weight: 5 }
}
}

impl From<HashRingSelectorConfig> for HashRingSelector {
fn from(config: HashRingSelectorConfig) -> Self {
HashRingSelector { config }
}
}

#[derive(Debug, Default, Copy, Clone)]
pub struct HashRingSelector;
pub struct HashRingSelector {
pub config: HashRingSelectorConfig,
}

impl MonolithSelection for HashRingSelector {
fn select_monolith<'a>(
&'a self,
room: &RoomName,
monoliths: Vec<&'a BalancerMonolith>,
) -> anyhow::Result<&BalancerMonolith> {
let weight = self.config.weight.max(1);
let mut ring = HashRing::new();
ring.batch_add(monoliths.iter().map(|m| RingNode { monolith: m }).collect());
ring.batch_add(
monoliths
.iter()
// This makes it so that each monolith is added to the ring 5 times with different hashes to spread the load more evenly
.flat_map(|m| std::iter::repeat(m).take(weight).enumerate())
.map(|(i, m)| RingNode {
monolith: m,
idx: i,
})
.collect(),
);

let node = ring.get(room).ok_or(anyhow::anyhow!("ring hash empty"))?;

Expand All @@ -105,11 +136,13 @@ impl MonolithSelection for HashRingSelector {

struct RingNode<'a> {
monolith: &'a BalancerMonolith,
idx: usize,
}

impl Hash for RingNode<'_> {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.monolith.id().hash(state);
self.idx.hash(state);
}
}

Expand Down Expand Up @@ -146,7 +179,7 @@ mod test {

let strategy: MonolithSelectionConfig =
serde_json::from_value(config).expect("failed to parse selection strategy config");
assert_eq!(strategy, MonolithSelectionConfig::HashRing);
assert!(matches!(strategy, MonolithSelectionConfig::HashRing(_)));
}

#[tokio::test]
Expand Down

0 comments on commit aa795a2

Please sign in to comment.