diff --git a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs index 123efbe60b3..a3b85a44dbc 100644 --- a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs +++ b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs @@ -23,6 +23,7 @@ use std::time::Duration; use fnv::FnvHashSet; use futures::StreamExt; use futures::stream::FuturesUnordered; +use itertools::{Itertools, MinMaxResult}; use quickwit_actors::Mailbox; use quickwit_common::Progress; use quickwit_common::pretty::PrettySample; @@ -81,7 +82,7 @@ const FIRE_AND_FORGET_TIMEOUT: Duration = Duration::from_secs(3); /// All errors are ignored, and not even logged. fn fire_and_forget( fut: impl Future + Send + 'static, - operation: impl std::fmt::Display + Send + Sync + 'static, + operation: impl std::fmt::Display + Send + 'static, ) { tokio::spawn(async move { if let Err(_timeout_elapsed) = tokio::time::timeout(FIRE_AND_FORGET_TIMEOUT, fut).await { @@ -997,78 +998,9 @@ impl IngestController { } } - /// This method just "computes"" the number of shards to move for rebalance. - /// It does not run any side effect except logging. - /// - /// TODO we consider the number of alive ingesters for this computation, - /// but deal with entire number of shards here. - /// This could cause problems when dealing with a lot of unavailable ingesters. - /// - /// On the other hand it biases thing the "right way": - /// If we are missing some ingesters, their shards should still be in the model, but they should - /// be missing from the ingester pool. - /// - /// As a result `num_open_shards_per_leader_threshold` should be inflated. - /// - /// TODO this implementation does not consider replica. - fn rebalance_compute_shards_to_move(&self, model: &ControlPlaneModel) -> Vec { - let num_ingesters = self.ingester_pool.len(); - let mut num_open_shards: usize = 0; - - if num_ingesters == 0 { - debug!("no ingester available"); - return Vec::new(); - } - let mut per_leader_open_shards: HashMap<&str, Vec<&ShardEntry>> = HashMap::new(); - - for shard in model.all_shards() { - if shard.is_open() { - num_open_shards += 1; - per_leader_open_shards - .entry(&shard.leader_id) - .or_default() - .push(shard); - } - } - // We tolerate an ingester with 10% more shards than the average. - // Let's first identify the list of shards we want to "move". - let num_open_shards_per_leader_threshold = - (num_open_shards * 11).div_ceil(10 * num_ingesters); - - let mut shards_to_move: Vec = Vec::new(); - - let mut rng = thread_rng(); - for open_shards in per_leader_open_shards.values() { - if let Some(num_shards_to_move) = open_shards - .len() - .checked_sub(num_open_shards_per_leader_threshold) - { - shards_to_move.extend( - open_shards[..] - .choose_multiple(&mut rng, num_shards_to_move) - .map(|shard_entry| shard_entry.shard.clone()), - ); - } - } - let num_shards_to_rebalance = shards_to_move.len(); - - if num_shards_to_rebalance == 0 { - info!("no shards to rebalance"); - } else { - info!( - num_open_shards, - num_available_ingesters = num_ingesters, - rebalance_threshold = num_open_shards_per_leader_threshold, - num_shards_to_rebalance, - "rebalancing shards" - ); - } - shards_to_move - } - - /// Moves shards from ingesters with too many shards to ingesters with too few shards. Moving a - /// shard consists of closing the shard on the source ingester and opening a new one on the - /// target ingester. + /// Rebalances shards from ingesters with too many shards to ingesters with too few shards. + /// Moving a shard consists of closing the shard on the source ingester and opening a new + /// one on the target ingester. /// /// This method is guarded by a lock to ensure that only one rebalance operation is performed at /// a time. @@ -1084,18 +1016,18 @@ impl IngestController { }; self.stats.num_rebalance_shards_ops += 1; - let shards_to_move: Vec = self.rebalance_compute_shards_to_move(model); + let shards_to_rebalance: Vec = self.compute_shards_to_rebalance(model); crate::metrics::CONTROL_PLANE_METRICS .rebalance_shards - .set(shards_to_move.len() as i64); + .set(shards_to_rebalance.len() as i64); - if shards_to_move.is_empty() { + if shards_to_rebalance.is_empty() { return Ok(None); } let mut per_source_num_shards_to_open: HashMap = HashMap::new(); - for shard in &shards_to_move { + for shard in &shards_to_rebalance { *per_source_num_shards_to_open .entry(shard.source_uid()) .or_default() += 1; @@ -1128,9 +1060,10 @@ impl IngestController { model.drain_scaling_permits(source_uid, ScalingMode::Down); } - // Let's close one of the shard to move for every successfully newly opened shards. - let mut shards_to_close = Vec::with_capacity(shards_to_move.len()); - for shard in shards_to_move { + // Close as many shards as we opened. Because `try_open_shards` might fail partially, we + // must only close the shards that we successfully opened. + let mut shards_to_close = Vec::with_capacity(shards_to_rebalance.len()); + for shard in shards_to_rebalance { let source_uid = shard.source_uid(); let Some(num_open_shards) = per_source_num_opened_shards.get_mut(&source_uid) else { continue; @@ -1139,14 +1072,7 @@ impl IngestController { continue; }; *num_open_shards -= 1; - - let leader_id = NodeId::from(shard.leader_id.clone()); - let shard_pkey = ShardPKey { - index_uid: shard.index_uid.clone(), - source_id: shard.source_id.clone(), - shard_id: shard.shard_id.clone(), - }; - shards_to_close.push((leader_id, shard_pkey)); + shards_to_close.push(shard); } let mailbox_clone = mailbox.clone(); @@ -1173,13 +1099,112 @@ impl IngestController { Ok(Some(tokio::spawn(close_shards_and_send_callback_fut))) } + /// This method just "computes"" the number of shards to move for rebalance. + /// It does not run any side effect except logging. + /// + /// TODO: We consider the number of available (i.e. alive according to chitchat) ingesters for + /// this computation, but deal with the entire number of shards here. + /// This could cause problems when dealing with a lot of unavailable ingesters. + /// + /// On the other hand it biases thing the "right way": + /// If we are missing some ingesters, their shards should still be in the model, but they should + /// be missing from the ingester pool. + /// + /// As a result `target_num_open_shards_per_leader` should be inflated. + /// + /// TODO: This implementation does not consider replicas. + fn compute_shards_to_rebalance(&self, model: &ControlPlaneModel) -> Vec { + let ingester_ids: Vec = self.ingester_pool.keys(); + let num_ingesters = ingester_ids.len(); + + if num_ingesters == 0 { + debug!("no ingesters available"); + return Vec::new(); + } + if num_ingesters < 2 { + return Vec::new(); + } + let mut num_open_shards: usize = 0; + let mut per_leader_open_shards: HashMap<&str, Vec<&Shard>> = HashMap::new(); + + for shard in model.all_shards() { + if shard.is_open() { + num_open_shards += 1; + per_leader_open_shards + .entry(&shard.leader_id) + .or_default() + .push(&shard.shard); + } + } + for ingester_id in &ingester_ids { + per_leader_open_shards + .entry(ingester_id.as_str()) + .or_default(); + } + let target_num_open_shards_per_leader = num_open_shards as f32 / num_ingesters as f32; + let max_num_open_shards_per_leader = + f32::ceil(target_num_open_shards_per_leader * 1.1) as usize; + let min_num_open_shards_per_leader = + f32::floor(target_num_open_shards_per_leader * 0.9) as usize; + + let mut rng = thread_rng(); + let mut per_leader_open_shard_shuffled: Vec> = per_leader_open_shards + .into_values() + .map(|mut shards| { + shards.shuffle(&mut rng); + shards + }) + .collect(); + + let mut shards_to_rebalance: Vec = Vec::new(); + + loop { + let MinMaxResult::MinMax(min_shards, max_shards) = per_leader_open_shard_shuffled + .iter_mut() + .minmax_by_key(|shards| shards.len()) + else { + break; + }; + if min_shards.len() < min_num_open_shards_per_leader + || max_shards.len() > max_num_open_shards_per_leader + { + let shard = max_shards.pop().expect("shards should not be empty"); + shards_to_rebalance.push(shard.clone()); + min_shards.push(shard); + } else { + break; + } + } + let num_shards_to_rebalance = shards_to_rebalance.len(); + + if num_shards_to_rebalance == 0 { + info!("no shards to rebalance"); + } else { + info!( + num_open_shards, + num_available_ingesters = num_ingesters, + min_shards_threshold = min_num_open_shards_per_leader, + max_shards_threshold = max_num_open_shards_per_leader, + num_shards_to_rebalance, + "rebalancing {num_shards_to_rebalance} shards" + ); + } + shards_to_rebalance + } + fn close_shards( &self, - shards_to_close: Vec<(LeaderId, ShardPKey)>, + shards_to_close: Vec, ) -> impl Future> + Send + 'static { let mut per_leader_shards_to_close: HashMap> = HashMap::new(); - for (leader_id, shard_pkey) in shards_to_close { + for shard in shards_to_close { + let shard_pkey = ShardPKey { + index_uid: shard.index_uid, + source_id: shard.source_id, + shard_id: shard.shard_id, + }; + let leader_id = NodeId::from(shard.leader_id); per_leader_shards_to_close .entry(leader_id) .or_default() @@ -3147,46 +3172,41 @@ mod tests { // - ingester 3 will be unavailable. let shards_to_close = vec![ - ( - ingester_id_0.clone(), - ShardPKey { - index_uid: Some(IndexUid::for_test("test-index", 0)), - source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(0)), - }, - ), - ( - ingester_id_0, - ShardPKey { - index_uid: Some(IndexUid::for_test("test-index", 0)), - source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(1)), - }, - ), - ( - ingester_id_1, - ShardPKey { - index_uid: Some(IndexUid::for_test("test-index", 0)), - source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(2)), - }, - ), - ( - ingester_id_2, - ShardPKey { - index_uid: Some(IndexUid::for_test("test-index", 0)), - source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(3)), - }, - ), - ( - NodeId::from("test-ingester-3"), - ShardPKey { - index_uid: Some(IndexUid::for_test("test-index", 0)), - source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(4)), - }, - ), + Shard { + index_uid: Some(IndexUid::for_test("test-index", 0)), + source_id: "test-source".to_string(), + shard_id: Some(ShardId::from(0)), + leader_id: ingester_id_0.to_string(), + ..Default::default() + }, + Shard { + index_uid: Some(IndexUid::for_test("test-index", 0)), + source_id: "test-source".to_string(), + shard_id: Some(ShardId::from(1)), + leader_id: ingester_id_0.to_string(), + ..Default::default() + }, + Shard { + index_uid: Some(IndexUid::for_test("test-index", 0)), + source_id: "test-source".to_string(), + shard_id: Some(ShardId::from(2)), + leader_id: ingester_id_1.to_string(), + ..Default::default() + }, + Shard { + index_uid: Some(IndexUid::for_test("test-index", 0)), + source_id: "test-source".to_string(), + shard_id: Some(ShardId::from(3)), + leader_id: ingester_id_2.to_string(), + ..Default::default() + }, + Shard { + index_uid: Some(IndexUid::for_test("test-index", 0)), + source_id: "test-source".to_string(), + shard_id: Some(ShardId::from(4)), + leader_id: "test-ingester-3".to_string(), + ..Default::default() + }, ]; let closed_shards = controller.close_shards(shards_to_close).await; assert_eq!(closed_shards.len(), 1); @@ -3518,4 +3538,142 @@ mod tests { &[NodeIdRef::from_str("node1"), NodeIdRef::from_str("node2")] ); } + + fn test_compute_shards_to_rebalance_aux(shard_allocation: &[usize]) { + let index_id = "test-index"; + let index_metadata = IndexMetadata::for_test(index_id, "ram://indexes/test-index"); + let index_uid = index_metadata.index_uid.clone(); + let source_id: SourceId = "test-source".to_string(); + + let mut model = ControlPlaneModel::default(); + model.add_index(index_metadata.clone()); + + let mut source_config = SourceConfig::ingest_v2(); + source_config.source_id = source_id.to_string(); + model.add_source(&index_uid, source_config).unwrap(); + + let ingester_pool = IngesterPool::default(); + let mock_ingester = MockIngesterService::new(); + let ingester_client = IngesterServiceClient::from_mock(mock_ingester); + + let ingester_ids: Vec = (0..shard_allocation.len()) + .map(|i| format!("test-ingester-{}", i)) + .collect(); + + for ingester_id in &ingester_ids { + ingester_pool.insert(NodeId::from(ingester_id.clone()), ingester_client.clone()); + } + let mut shards = Vec::new(); + + for (ingester_idx, &num_shards) in shard_allocation.iter().enumerate() { + for _ in 0..num_shards { + let shard_id = shards.len() as u64; + let shard = Shard { + index_uid: Some(index_uid.clone()), + source_id: source_id.to_string(), + shard_id: Some(ShardId::from(shard_id)), + leader_id: ingester_ids[ingester_idx].clone(), + shard_state: ShardState::Open as i32, + ..Default::default() + }; + shards.push(shard); + } + } + model.insert_shards(&index_uid, &source_id, shards.clone()); + + let controller = IngestController::new( + MetastoreServiceClient::mocked(), + ingester_pool.clone(), + 2, // replication_factor + TEST_SHARD_THROUGHPUT_LIMIT_MIB, + 1.001, + ); + let shards_to_rebalance = controller.compute_shards_to_rebalance(&model); + + let source_uid = SourceUid { + index_uid: index_uid.clone(), + source_id: source_id.clone(), + }; + let shard_ids_to_rebalance: Vec = shards_to_rebalance + .iter() + .flat_map(|shard| shard.shard_id.clone()) + .collect(); + + let closed_shard_ids = model.close_shards(&source_uid, &shard_ids_to_rebalance); + assert_eq!(closed_shard_ids.len(), shards_to_rebalance.len()); + + let mut per_ingester_num_shards: HashMap<&str, usize> = HashMap::new(); + + for shard in model.all_shards() { + if shard.is_open() { + *per_ingester_num_shards.entry(&shard.leader_id).or_default() += 1; + } + } + for ingester_id in &ingester_ids { + per_ingester_num_shards + .entry(ingester_id.as_str()) + .or_default(); + } + let mut per_ingester_num_shards_sorted: BTreeSet<(usize, &str)> = per_ingester_num_shards + .into_iter() + .map(|(ingester_id, num_shards)| (num_shards, ingester_id)) + .collect(); + let mut opened_shards: Vec = Vec::new(); + let mut shard_id = shards.len() as u64; + + for _ in 0..shards_to_rebalance.len() { + let (num_shards, ingester_id) = per_ingester_num_shards_sorted.pop_first().unwrap(); + let opened_shard = Shard { + index_uid: Some(index_uid.clone()), + source_id: source_id.to_string(), + shard_id: Some(ShardId::from(shard_id)), + leader_id: ingester_id.to_string(), + shard_state: ShardState::Open as i32, + ..Default::default() + }; + per_ingester_num_shards_sorted.insert((num_shards + 1, ingester_id)); + opened_shards.push(opened_shard); + shard_id += 1; + } + let num_open_shards: usize = per_ingester_num_shards_sorted + .iter() + .map(|(num_shards, _)| num_shards) + .sum(); + let target_num_open_shards_per_leader = num_open_shards as f32 / ingester_ids.len() as f32; + let max_num_open_shards_per_leader = + f32::ceil(target_num_open_shards_per_leader * 1.1) as usize; + let min_num_open_shards_per_leader = + f32::floor(target_num_open_shards_per_leader * 0.9) as usize; + assert!( + per_ingester_num_shards_sorted + .iter() + .all( + |(num_shards, _)| *num_shards >= min_num_open_shards_per_leader + && *num_shards <= max_num_open_shards_per_leader + ) + ); + + // Test stability of the algorithm + model.insert_shards(&index_uid, &source_id, opened_shards); + + let shards_to_rebalance = controller.compute_shards_to_rebalance(&model); + assert!(shards_to_rebalance.is_empty()); + } + + proptest! { + #[test] + fn test_compute_shards_to_rebalance_proptest( + shard_allocation in proptest::collection::vec(0..13usize, 0..13usize), + ) { + test_compute_shards_to_rebalance_aux(&shard_allocation); + } + } + + #[test] + fn test_compute_shards_to_rebalance() { + test_compute_shards_to_rebalance_aux(&[]); + test_compute_shards_to_rebalance_aux(&[0]); + test_compute_shards_to_rebalance_aux(&[1]); + test_compute_shards_to_rebalance_aux(&[0, 1]); + } } diff --git a/quickwit/quickwit-indexing/src/merge_policy/stable_log_merge_policy.rs b/quickwit/quickwit-indexing/src/merge_policy/stable_log_merge_policy.rs index 9cd6982f847..43f2e352001 100644 --- a/quickwit/quickwit-indexing/src/merge_policy/stable_log_merge_policy.rs +++ b/quickwit/quickwit-indexing/src/merge_policy/stable_log_merge_policy.rs @@ -297,11 +297,6 @@ impl StableLogMergePolicy { } } -#[cfg(test)] -fn is_sorted(elements: &[usize]) -> bool { - elements.windows(2).all(|w| w[0] <= w[1]) -} - // Helpers which expose some internal properties of // the stable log merge policy to be tested in unit tests. #[cfg(test)] @@ -337,7 +332,8 @@ impl StableLogMergePolicy { levels: &[usize], sorted: bool, ) -> usize { - assert!(is_sorted(levels)); + assert!(levels.is_sorted()); + if num_docs == 0 { return 0; } diff --git a/quickwit/quickwit-search/src/tests.rs b/quickwit/quickwit-search/src/tests.rs index e986352c5b2..79a6ee9ff79 100644 --- a/quickwit/quickwit-search/src/tests.rs +++ b/quickwit/quickwit-search/src/tests.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cmp::Ordering; use std::collections::{BTreeMap, BTreeSet}; use assert_json_diff::{assert_json_eq, assert_json_include}; @@ -260,24 +261,6 @@ async fn test_slop_queries() { test_sandbox.assert_quit().await; } -// TODO remove me once `Iterator::is_sorted_by_key` is stabilized. -fn is_reverse_sorted>(mut it: I) -> bool -where E: Ord { - let mut previous_el = if let Some(first_el) = it.next() { - first_el - } else { - // The empty list is sorted! - return true; - }; - for next_el in it { - if next_el > previous_el { - return false; - } - previous_el = next_el; - } - true -} - #[tokio::test] async fn test_single_node_several_splits() -> anyhow::Result<()> { let index_id = "single-node-several-splits"; @@ -325,7 +308,7 @@ async fn test_single_node_several_splits() -> anyhow::Result<()> { .as_ref() .map(|partial_hit| (partial_hit.split_id.as_str(), partial_hit.doc_id as i32)) }); - assert!(is_reverse_sorted(hit_keys)); + assert!(hit_keys.is_sorted_by(|left, right| left.cmp(right) == Ordering::Greater)); assert!(single_node_result.elapsed_time_micros > 10); assert!(single_node_result.elapsed_time_micros < 1_000_000); test_sandbox.assert_quit().await;