@@ -23,6 +23,7 @@ use std::time::Duration;
2323use fnv:: FnvHashSet ;
2424use futures:: StreamExt ;
2525use futures:: stream:: FuturesUnordered ;
26+ use itertools:: Itertools ;
2627use quickwit_actors:: Mailbox ;
2728use quickwit_common:: Progress ;
2829use quickwit_common:: pretty:: PrettySample ;
@@ -81,7 +82,7 @@ const FIRE_AND_FORGET_TIMEOUT: Duration = Duration::from_secs(3);
8182/// All errors are ignored, and not even logged.
8283fn fire_and_forget (
8384 fut : impl Future < Output = ( ) > + Send + ' static ,
84- operation : impl std:: fmt:: Display + Send + Sync + ' static ,
85+ operation : impl std:: fmt:: Display + Send + ' static ,
8586) {
8687 tokio:: spawn ( async move {
8788 if let Err ( _timeout_elapsed) = tokio:: time:: timeout ( FIRE_AND_FORGET_TIMEOUT , fut) . await {
@@ -1008,62 +1009,93 @@ impl IngestController {
10081009 /// If we are missing some ingesters, their shards should still be in the model, but they should
10091010 /// be missing from the ingester pool.
10101011 ///
1011- /// As a result `num_open_shards_per_leader_threshold ` should be inflated.
1012+ /// As a result `target_num_open_shards_per_leader ` should be inflated.
10121013 ///
10131014 /// TODO this implementation does not consider replica.
10141015 fn rebalance_compute_shards_to_move ( & self , model : & ControlPlaneModel ) -> Vec < Shard > {
1015- let num_ingesters = self . ingester_pool . len ( ) ;
1016- let mut num_open_shards : usize = 0 ;
1016+ let ingester_ids : Vec < NodeId > = self . ingester_pool . keys ( ) ;
1017+ let num_ingesters = ingester_ids . len ( ) ;
10171018
10181019 if num_ingesters == 0 {
1019- debug ! ( "no ingester available" ) ;
1020+ debug ! ( "no ingesters available" ) ;
1021+ return Vec :: new ( ) ;
1022+ }
1023+ if num_ingesters < 2 {
10201024 return Vec :: new ( ) ;
10211025 }
1022- let mut per_leader_open_shards: HashMap < & str , Vec < & ShardEntry > > = HashMap :: new ( ) ;
1026+ let mut num_open_shards: usize = 0 ;
1027+ let mut per_leader_open_shard_entries: HashMap < & str , Vec < & ShardEntry > > = HashMap :: new ( ) ;
10231028
10241029 for shard in model. all_shards ( ) {
10251030 if shard. is_open ( ) {
10261031 num_open_shards += 1 ;
1027- per_leader_open_shards
1032+ per_leader_open_shard_entries
10281033 . entry ( & shard. leader_id )
10291034 . or_default ( )
10301035 . push ( shard) ;
10311036 }
10321037 }
1033- // We tolerate an ingester with 10% more shards than the average.
1034- // Let's first identify the list of shards we want to "move".
1035- let num_open_shards_per_leader_threshold =
1036- ( num_open_shards * 11 ) . div_ceil ( 10 * num_ingesters) ;
1037-
1038- let mut shards_to_move: Vec < Shard > = Vec :: new ( ) ;
1038+ for ingester_id in & ingester_ids {
1039+ per_leader_open_shard_entries
1040+ . entry ( ingester_id. as_str ( ) )
1041+ . or_default ( ) ;
1042+ }
1043+ let target_num_open_shards_per_leader = num_open_shards as f32 / num_ingesters as f32 ;
1044+ let max_num_open_shards_per_leader =
1045+ f32:: ceil ( target_num_open_shards_per_leader * 1.1 ) as usize ;
1046+ let min_num_open_shards_per_leader =
1047+ f32:: floor ( target_num_open_shards_per_leader * 0.9 ) as usize ;
10391048
10401049 let mut rng = thread_rng ( ) ;
1041- for open_shards in per_leader_open_shards. values ( ) {
1042- if let Some ( num_shards_to_move) = open_shards
1043- . len ( )
1044- . checked_sub ( num_open_shards_per_leader_threshold)
1045- {
1046- shards_to_move. extend (
1047- open_shards[ ..]
1048- . choose_multiple ( & mut rng, num_shards_to_move)
1049- . map ( |shard_entry| shard_entry. shard . clone ( ) ) ,
1050- ) ;
1050+ let mut per_leader_open_shards: Vec < Vec < Shard > > = per_leader_open_shard_entries
1051+ . into_values ( )
1052+ . map ( |shard_entries| {
1053+ let mut shards: Vec < Shard > = shard_entries
1054+ . into_iter ( )
1055+ . map ( |shard_entry| shard_entry. shard . clone ( ) )
1056+ . collect ( ) ;
1057+ shards. shuffle ( & mut rng) ;
1058+ shards
1059+ } )
1060+ . sorted_unstable_by_key ( Vec :: len)
1061+ . collect ( ) ;
1062+
1063+ let mut shards_to_rebalance = Vec :: new ( ) ;
1064+ let last_idx = per_leader_open_shards. len ( ) - 1 ;
1065+
1066+ loop {
1067+ if per_leader_open_shards[ 0 ] . len ( ) < min_num_open_shards_per_leader {
1068+ let shard = per_leader_open_shards[ last_idx]
1069+ . pop ( )
1070+ . expect ( "shards should not be empty" ) ;
1071+ per_leader_open_shards[ 0 ] . push ( shard. clone ( ) ) ;
1072+ shards_to_rebalance. push ( shard) ;
1073+ } else if per_leader_open_shards[ last_idx] . len ( ) > max_num_open_shards_per_leader {
1074+ let shard = per_leader_open_shards[ last_idx]
1075+ . pop ( )
1076+ . expect ( "shards should not be empty" ) ;
1077+ per_leader_open_shards[ 0 ] . push ( shard. clone ( ) ) ;
1078+ shards_to_rebalance. push ( shard) ;
1079+ } else {
1080+ break ;
10511081 }
1082+ per_leader_open_shards. sort_unstable_by_key ( Vec :: len) ;
10521083 }
1053- let num_shards_to_rebalance = shards_to_move . len ( ) ;
1084+ let num_shards_to_rebalance = shards_to_rebalance . len ( ) ;
10541085
10551086 if num_shards_to_rebalance == 0 {
10561087 info ! ( "no shards to rebalance" ) ;
10571088 } else {
10581089 info ! (
10591090 num_open_shards,
10601091 num_available_ingesters = num_ingesters,
1061- rebalance_threshold = num_open_shards_per_leader_threshold,
1092+ min_shards_threshold = min_num_open_shards_per_leader,
1093+ max_shards_threshold = max_num_open_shards_per_leader,
10621094 num_shards_to_rebalance,
10631095 "rebalancing shards"
10641096 ) ;
10651097 }
1066- shards_to_move
1098+ shards_to_rebalance
10671099 }
10681100
10691101 /// Moves shards from ingesters with too many shards to ingesters with too few shards. Moving a
0 commit comments