@@ -23,6 +23,7 @@ use std::time::Duration;
2323use fnv:: FnvHashSet ;
2424use futures:: StreamExt ;
2525use futures:: stream:: FuturesUnordered ;
26+ use itertools:: { Itertools , MinMaxResult } ;
2627use quickwit_actors:: Mailbox ;
2728use quickwit_common:: Progress ;
2829use quickwit_common:: pretty:: PrettySample ;
@@ -81,7 +82,7 @@ const FIRE_AND_FORGET_TIMEOUT: Duration = Duration::from_secs(3);
8182/// All errors are ignored, and not even logged.
8283fn fire_and_forget (
8384 fut : impl Future < Output = ( ) > + Send + ' static ,
84- operation : impl std:: fmt:: Display + Send + Sync + ' static ,
85+ operation : impl std:: fmt:: Display + Send + ' static ,
8586) {
8687 tokio:: spawn ( async move {
8788 if let Err ( _timeout_elapsed) = tokio:: time:: timeout ( FIRE_AND_FORGET_TIMEOUT , fut) . await {
@@ -1008,59 +1009,83 @@ impl IngestController {
10081009 /// If we are missing some ingesters, their shards should still be in the model, but they should
10091010 /// be missing from the ingester pool.
10101011 ///
1011- /// As a result `num_open_shards_per_leader_threshold ` should be inflated.
1012+ /// As a result `target_num_open_shards_per_leader ` should be inflated.
10121013 ///
10131014 /// TODO this implementation does not consider replica.
10141015 fn rebalance_compute_shards_to_move ( & self , model : & ControlPlaneModel ) -> Vec < Shard > {
1015- let num_ingesters = self . ingester_pool . len ( ) ;
1016- let mut num_open_shards : usize = 0 ;
1016+ let ingester_ids : Vec < NodeId > = self . ingester_pool . keys ( ) ;
1017+ let num_ingesters = ingester_ids . len ( ) ;
10171018
10181019 if num_ingesters == 0 {
1019- debug ! ( "no ingester available" ) ;
1020+ debug ! ( "no ingesters available" ) ;
1021+ return Vec :: new ( ) ;
1022+ }
1023+ if num_ingesters < 2 {
10201024 return Vec :: new ( ) ;
10211025 }
1022- let mut per_leader_open_shards: HashMap < & str , Vec < & ShardEntry > > = HashMap :: new ( ) ;
1026+ let mut num_open_shards: usize = 0 ;
1027+ let mut per_leader_open_shards: HashMap < & str , Vec < & Shard > > = HashMap :: new ( ) ;
10231028
10241029 for shard in model. all_shards ( ) {
10251030 if shard. is_open ( ) {
10261031 num_open_shards += 1 ;
10271032 per_leader_open_shards
10281033 . entry ( & shard. leader_id )
10291034 . or_default ( )
1030- . push ( shard) ;
1035+ . push ( & shard . shard ) ;
10311036 }
10321037 }
1033- // We tolerate an ingester with 10% more shards than the average.
1034- // Let's first identify the list of shards we want to "move".
1035- let num_open_shards_per_leader_threshold =
1036- ( num_open_shards * 11 ) . div_ceil ( 10 * num_ingesters) ;
1038+ for ingester_id in & ingester_ids {
1039+ per_leader_open_shards
1040+ . entry ( ingester_id. as_str ( ) )
1041+ . or_default ( ) ;
1042+ }
1043+ let target_num_open_shards_per_leader = num_open_shards as f32 / num_ingesters as f32 ;
1044+ let max_num_open_shards_per_leader =
1045+ f32:: ceil ( target_num_open_shards_per_leader * 1.1 ) as usize ;
1046+ let min_num_open_shards_per_leader =
1047+ f32:: floor ( target_num_open_shards_per_leader * 0.9 ) as usize ;
1048+
1049+ let mut rng = thread_rng ( ) ;
1050+ let mut per_leader_open_shard_shuffled: Vec < Vec < & Shard > > = per_leader_open_shards
1051+ . into_values ( )
1052+ . map ( |mut shards| {
1053+ shards. shuffle ( & mut rng) ;
1054+ shards
1055+ } )
1056+ . collect ( ) ;
10371057
10381058 let mut shards_to_move: Vec < Shard > = Vec :: new ( ) ;
10391059
1040- let mut rng = thread_rng ( ) ;
1041- for open_shards in per_leader_open_shards. values ( ) {
1042- if let Some ( num_shards_to_move) = open_shards
1043- . len ( )
1044- . checked_sub ( num_open_shards_per_leader_threshold)
1060+ loop {
1061+ let MinMaxResult :: MinMax ( min_shards, max_shards) = per_leader_open_shard_shuffled
1062+ . iter_mut ( )
1063+ . minmax_by_key ( |shards| shards. len ( ) )
1064+ else {
1065+ break ;
1066+ } ;
1067+ if min_shards. len ( ) < min_num_open_shards_per_leader
1068+ || max_shards. len ( ) > max_num_open_shards_per_leader
10451069 {
1046- shards_to_move . extend (
1047- open_shards [ .. ]
1048- . choose_multiple ( & mut rng , num_shards_to_move )
1049- . map ( |shard_entry| shard_entry . shard . clone ( ) ) ,
1050- ) ;
1070+ let shard = max_shards . pop ( ) . expect ( "shards should not be empty" ) ;
1071+ shards_to_move . push ( shard . clone ( ) ) ;
1072+ min_shards . push ( shard ) ;
1073+ } else {
1074+ break ;
10511075 }
10521076 }
1053- let num_shards_to_rebalance = shards_to_move. len ( ) ;
1077+ let num_shards_to_move = shards_to_move. len ( ) ;
10541078
1055- if num_shards_to_rebalance == 0 {
1079+ if num_shards_to_move == 0 {
10561080 info ! ( "no shards to rebalance" ) ;
10571081 } else {
10581082 info ! (
10591083 num_open_shards,
10601084 num_available_ingesters = num_ingesters,
1061- rebalance_threshold = num_open_shards_per_leader_threshold,
1062- num_shards_to_rebalance,
1063- "rebalancing shards"
1085+ min_shards_threshold = min_num_open_shards_per_leader,
1086+ max_shards_threshold = max_num_open_shards_per_leader,
1087+ num_shards_to_move,
1088+ "rebalancing {num_shards_to_move} shards"
10641089 ) ;
10651090 }
10661091 shards_to_move
0 commit comments