From 2d527026d2173f107075a96cf24ca2ea985c8cda Mon Sep 17 00:00:00 2001 From: Xiangpeng Hu Date: Mon, 1 Dec 2025 13:10:11 +0800 Subject: [PATCH 1/2] load balance framework --- .../iotdb/db/qp/sql/IdentifierParser.g4 | 1 + .../apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 | 6 +- .../org/apache/iotdb/db/qp/sql/SqlLexer.g4 | 4 + .../heartbeat/DataNodeHeartbeatHandler.java | 20 +- .../confignode/manager/ConfigManager.java | 89 +++ .../iotdb/confignode/manager/IManager.java | 2 + .../confignode/manager/load/LoadManager.java | 20 + .../manager/load/balancer/RegionBalancer.java | 24 + .../GreedyCopySetRegionGroupMigrator.java | 664 ++++++++++++++++++ .../region/migrator/IRegionGroupMigrator.java | 46 ++ .../load/cache/region/RegionCache.java | 10 +- .../load/cache/region/RegionGroupCache.java | 10 +- .../cache/region/RegionGroupStatistics.java | 9 + .../cache/region/RegionHeartbeatSample.java | 10 + .../load/cache/region/RegionStatistics.java | 11 +- .../thrift/ConfigNodeRPCServiceProcessor.java | 5 + .../GreedyCopySetRegionGroupMigratorTest.java | 206 ++++++ .../db/protocol/client/ConfigNodeClient.java | 6 + .../config/TreeConfigTaskVisitor.java | 8 + .../executor/ClusterConfigTaskExecutor.java | 18 + .../config/executor/IConfigTaskExecutor.java | 2 + .../config/metadata/BalanceRegionsTask.java | 42 ++ .../queryengine/plan/parser/ASTVisitor.java | 6 + .../security/TreeAccessCheckVisitor.java | 7 + .../plan/statement/StatementVisitor.java | 5 + .../metadata/BalanceRegionsStatement.java | 51 ++ .../src/main/thrift/confignode.thrift | 3 + 27 files changed, 1271 insertions(+), 14 deletions(-) create mode 100644 iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/migrator/GreedyCopySetRegionGroupMigrator.java create mode 100644 iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/migrator/IRegionGroupMigrator.java create mode 100644 iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/migrator/GreedyCopySetRegionGroupMigratorTest.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/BalanceRegionsTask.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/BalanceRegionsStatement.java diff --git a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4 b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4 index 4a01b352384c..d0fa0efc70b5 100644 --- a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4 +++ b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4 @@ -48,6 +48,7 @@ keyWords | ASC | ATTRIBUTES | AVAILABLE + | BALANCE | BEFORE | BEGIN | BETWEEN diff --git a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 index bd49f38ec3c7..8bb06a1dceb4 100644 --- a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 +++ b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 @@ -64,7 +64,7 @@ ddlStatement | showVariables | showCluster | showRegions | showDataNodes | showAvailableUrls | showConfigNodes | showClusterId | getRegionId | getTimeSlotList | countTimeSlotList | getSeriesSlotList | migrateRegion | reconstructRegion | extendRegion | removeRegion | removeDataNode | removeConfigNode | removeAINode - | verifyConnection + | verifyConnection | balanceRegions // AINode | showAINodes | createModel | dropModel | showModels | showLoadedModels | showAIDevices | callInference | loadModel | unloadModel @@ -562,6 +562,10 @@ verifyConnection : VERIFY CONNECTION (DETAILS)? ; +balanceRegions + : LOAD BALANCE + ; + // ---- Remove DataNode removeDataNode : REMOVE DATANODE dataNodeId=INTEGER_LITERAL diff --git a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 index 5696d5da32a4..8d84d02c4544 100644 --- a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 +++ b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 @@ -109,6 +109,10 @@ ATTRIBUTES : A T T R I B U T E S ; +BALANCE + : B A L A N C E + ; + BEFORE : B E F O R E ; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java index e7a31b1dc73e..fe3863e1d9b9 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java @@ -104,16 +104,20 @@ public void onComplete(TDataNodeHeartbeatResp heartbeatResp) { } // Update RegionGroupCache + RegionHeartbeatSample regionHeartbeatSample = + new RegionHeartbeatSample( + heartbeatResp.getHeartbeatTimestamp(), + // Region will inherit DataNode's status + nextRegionStatus); + + if (heartbeatResp.isSetRegionDisk() + && heartbeatResp.getRegionDisk().containsKey(regionGroupId.getId())) { + regionHeartbeatSample.setDiskUsage( + heartbeatResp.getRegionDisk().get(regionGroupId.getId())); + } loadManager .getLoadCache() - .cacheRegionHeartbeatSample( - regionGroupId, - nodeId, - new RegionHeartbeatSample( - heartbeatResp.getHeartbeatTimestamp(), - // Region will inherit DataNode's status - nextRegionStatus), - false); + .cacheRegionHeartbeatSample(regionGroupId, nodeId, regionHeartbeatSample, false); if (((TConsensusGroupType.SchemaRegion.equals(regionGroupId.getType()) && SCHEMA_REGION_SHOULD_CACHE_CONSENSUS_SAMPLE) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index 5d4b09adfc71..91c111344f62 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -28,10 +28,12 @@ import org.apache.iotdb.ainode.rpc.thrift.TShowModelsResp; import org.apache.iotdb.ainode.rpc.thrift.TTrainingReq; import org.apache.iotdb.ainode.rpc.thrift.TUnloadModelReq; +import org.apache.iotdb.common.rpc.thrift.Model; import org.apache.iotdb.common.rpc.thrift.TAINodeConfiguration; import org.apache.iotdb.common.rpc.thrift.TAINodeLocation; import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TEndPoint; @@ -118,6 +120,7 @@ import org.apache.iotdb.confignode.manager.cq.CQManager; import org.apache.iotdb.confignode.manager.load.LoadManager; import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample; +import org.apache.iotdb.confignode.manager.load.cache.region.RegionGroupStatistics; import org.apache.iotdb.confignode.manager.node.ClusterNodeStartUtils; import org.apache.iotdb.confignode.manager.node.NodeManager; import org.apache.iotdb.confignode.manager.node.NodeMetrics; @@ -291,6 +294,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -3106,4 +3110,89 @@ public DataSet registerAINode(TAINodeRegisterReq req) { resp.setConfigNodeList(getNodeManager().getRegisteredConfigNodes()); return resp; } + + @Override + public TSStatus balanceRegions() { + List availableDataNodes = + getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running, NodeStatus.Unknown); + Map availableDataNodeMap = + new HashMap<>(availableDataNodes.size()); + availableDataNodes.forEach( + dataNodeConfiguration -> { + int dataNodeId = dataNodeConfiguration.getLocation().getDataNodeId(); + availableDataNodeMap.put(dataNodeId, dataNodeConfiguration); + }); + Map regionGroupStatisticsMap = + getLoadManager().getLoadCache().getCurrentRegionGroupStatisticsMap(); + List dataRegions = + getPartitionManager().getAllReplicaSets(TConsensusGroupType.DataRegion); + + Map targetRegionGroupMap = + getLoadManager() + .autoBalanceRegionReplicasDistribution( + availableDataNodeMap, + regionGroupStatisticsMap, + dataRegions, + dataRegions.get(0).getDataNodeLocationsSize()); + Map regionCounter = new TreeMap<>(); + for (TRegionReplicaSet regionReplicaSet : targetRegionGroupMap.values()) { + regionReplicaSet + .getDataNodeLocations() + .forEach(location -> regionCounter.merge(location.getDataNodeId(), 1, Integer::sum)); + } + LOGGER.info("[AutoMigration] Target region counter: {}", regionCounter); + + // Process each region sequentially to avoid data race and resource leak + for (TRegionReplicaSet regionReplicaSet : dataRegions) { + try { + int regionId = regionReplicaSet.getRegionId().getId(); + List originalDataNodeIds = + regionReplicaSet.getDataNodeLocations().stream() + .map(TDataNodeLocation::getDataNodeId) + .collect(Collectors.toList()); + int replicationFactor = originalDataNodeIds.size(); + List targetDataNodeIds = + targetRegionGroupMap.get(regionReplicaSet.getRegionId()).getDataNodeLocations().stream() + .map(TDataNodeLocation::getDataNodeId) + .collect(Collectors.toList()); + LOGGER.info( + "[AutoMigration] Start balancing region {} from {} to {}", + regionId, + originalDataNodeIds, + targetDataNodeIds); + boolean[] isDataNodeEmployed = new boolean[replicationFactor]; + Arrays.fill(isDataNodeEmployed, false); + for (int originalId : originalDataNodeIds) { + if (targetDataNodeIds.contains(originalId)) { + // Prune: prefill the overlap replicas + isDataNodeEmployed[targetDataNodeIds.indexOf(originalId)] = true; + } + } + for (int originalId : originalDataNodeIds) { + if (!targetDataNodeIds.contains(originalId)) { + for (int j = 0; j < replicationFactor; j++) { + if (!isDataNodeEmployed[j]) { + isDataNodeEmployed[j] = true; + getProcedureManager() + .migrateRegion( + new TMigrateRegionReq( + regionId, originalId, targetDataNodeIds.get(j), Model.TREE)); + LOGGER.info( + "[AutoMigration] Submit migrating region {} from DataNode {} to DataNode {}", + regionId, + originalId, + targetDataNodeIds.get(j)); + break; + } + } + } + } + } catch (Exception e) { + LOGGER.error("Failed to migrate region {} because {}", regionReplicaSet, e.getMessage()); + } + } + return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()) + .setMessage( + "Successfully submit migrate regions task! IoTDB will migrate regions automatically."); + } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java index 33e77db24907..5641432a871f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java @@ -927,4 +927,6 @@ TDescTableResp describeTable( TFetchTableResp fetchTables(final Map> fetchTableMap); TSStatus pushHeartbeat(final int dataNodeId, final TPipeHeartbeatResp resp); + + TSStatus balanceRegions(); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java index 993bfc0e4006..73019bbfdc3e 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java @@ -21,6 +21,7 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; +import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.commons.cluster.NodeStatus; @@ -40,6 +41,7 @@ import org.apache.iotdb.confignode.manager.load.cache.LoadCache; import org.apache.iotdb.confignode.manager.load.cache.consensus.ConsensusGroupHeartbeatSample; import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample; +import org.apache.iotdb.confignode.manager.load.cache.region.RegionGroupStatistics; import org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample; import org.apache.iotdb.confignode.manager.load.service.EventService; import org.apache.iotdb.confignode.manager.load.service.HeartbeatService; @@ -473,6 +475,24 @@ public void forceUpdateConsensusGroupCache( eventService.checkAndBroadcastConsensusGroupStatisticsChangeEventIfNecessary(); } + /** + * Auto balance the RegionReplicas' distribution for cluster RegionGroups. + * + * @param availableDataNodeMap DataNodes that can be used for allocation + * @param regionGroupStatisticsMap Statistics of RegionGroups + * @param allocatedRegionGroups Allocated RegionGroups + * @param replicationFactor Replication factor of TRegionReplicaSet + * @return The optimal TRegionReplicaSet derived by the specified algorithm + */ + public Map autoBalanceRegionReplicasDistribution( + Map availableDataNodeMap, + Map regionGroupStatisticsMap, + List allocatedRegionGroups, + int replicationFactor) { + return regionBalancer.autoBalanceRegionReplicasDistribution( + availableDataNodeMap, regionGroupStatisticsMap, allocatedRegionGroups, replicationFactor); + } + public LoadCache getLoadCache() { return loadCache; } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java index 10864e9fba9c..1396e404e988 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java @@ -34,6 +34,9 @@ import org.apache.iotdb.confignode.manager.load.balancer.region.GreedyRegionGroupAllocator; import org.apache.iotdb.confignode.manager.load.balancer.region.IRegionGroupAllocator; import org.apache.iotdb.confignode.manager.load.balancer.region.PartiteGraphPlacementRegionGroupAllocator; +import org.apache.iotdb.confignode.manager.load.balancer.region.migrator.GreedyCopySetRegionGroupMigrator; +import org.apache.iotdb.confignode.manager.load.balancer.region.migrator.IRegionGroupMigrator; +import org.apache.iotdb.confignode.manager.load.cache.region.RegionGroupStatistics; import org.apache.iotdb.confignode.manager.node.NodeManager; import org.apache.iotdb.confignode.manager.partition.PartitionManager; import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager; @@ -50,9 +53,12 @@ public class RegionBalancer { private final IManager configManager; private final IRegionGroupAllocator regionGroupAllocator; + private final IRegionGroupMigrator regionGroupMigrator; public RegionBalancer(IManager configManager) { this.configManager = configManager; + // Todo(xphu): add more migrator algorithms and make it configurable + this.regionGroupMigrator = new GreedyCopySetRegionGroupMigrator(); switch (ConfigNodeDescriptor.getInstance().getConf().getRegionGroupAllocatePolicy()) { case GREEDY: @@ -141,6 +147,24 @@ public CreateRegionGroupsPlan genRegionGroupsAllocationPlan( return createRegionGroupsPlan; } + /** + * Auto balance the RegionReplicas' distribution for cluster RegionGroups. + * + * @param availableDataNodeMap DataNodes that can be used for allocation + * @param regionGroupStatisticsMap Statistics of RegionGroups + * @param allocatedRegionGroups Allocated RegionGroups + * @param replicationFactor Replication factor of TRegionReplicaSet + * @return The optimal TRegionReplicaSet derived by the specified algorithm + */ + public Map autoBalanceRegionReplicasDistribution( + Map availableDataNodeMap, + Map regionGroupStatisticsMap, + List allocatedRegionGroups, + int replicationFactor) { + return regionGroupMigrator.autoBalanceRegionReplicasDistribution( + availableDataNodeMap, regionGroupStatisticsMap, allocatedRegionGroups, replicationFactor); + } + private NodeManager getNodeManager() { return configManager.getNodeManager(); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/migrator/GreedyCopySetRegionGroupMigrator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/migrator/GreedyCopySetRegionGroupMigrator.java new file mode 100644 index 000000000000..40515d16680a --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/migrator/GreedyCopySetRegionGroupMigrator.java @@ -0,0 +1,664 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.manager.load.balancer.region.migrator; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.confignode.manager.load.cache.region.RegionGroupStatistics; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +public class GreedyCopySetRegionGroupMigrator implements IRegionGroupMigrator { + private static final Logger LOGGER = + LoggerFactory.getLogger(GreedyCopySetRegionGroupMigrator.class); + + private int replicationFactor; + // Sorted available DataNodeIds + private int[] dataNodeIds; + // The number of allocated Regions in each DataNode + private int[] regionCounter; + // The number of disk usage in each DataNode + private long[] diskCounter; + // The number of allocated Regions in each DataNode within the same Database + private int[] databaseRegionCounter; + // The number of 2-Region combinations in current cluster + private int[][] combinationCounter; + + // DataNodeId sets for quick lookup + private Set availableFromDataNodeSet; + private Set availableToDataNodeSet; + // For each region, the allowed migration options + private Map> allowedMigrateOptions; + // Statistics of RegionGroups + private Map regionGroupStatisticsMap; + // dfs batch size + private int BATCH_SIZE = 8; + // A list of regions that need to be migrated. + private List dfsRegionKeys; + // Buffer holding best assignment arrays. + private MigrateOption[] bestAssignment; + // An int array holding the best metrics found so far: [maxGlobalLoad, maxDatabaseLoad, + // scatterValue]. + private long[] bestMetrics; + // Pre-calculation, scatterDelta[i][j] means the scatter difference between region i and the + // option j + private int[][] scatterDelta; + + private Map> replicaNodesIdMap; + + // ---------------- Multi-objective weighted-sum scoring (with approximate normalization) + // ---------------- + // Weights (can be tuned later or exposed via configuration) + // Design principle: balance improvement should be valued more than migration cost + // When diskVariance improves by X, it should be worth more than migrating X disk units + private double weightDiskVariance = 4; + private double weightRegionVariance = 3; + private double weightMigrateCost = 2; + private double weightScatter = 1; // note: scatter is maximized, so will be subtracted in score + + // Normalization scales computed per DFS batch + private double scaleDiskVariance = 1.0; + private double scaleRegionVariance = 1.0; + private double scaleMigrateCost = 1.0; + private double scaleScatter = 1.0; + private long scatterMinBound = 0L; // for min-max normalization of scatter + private long scatterMaxBound = 1L; + + // Min-Max bounds per batch for normalization + private long diskVarMinBound = 0L; + private long diskVarMaxBound = 1L; + private long regionVarMinBound = 0L; + private long regionVarMaxBound = 1L; + private long migrateMinBound = 0L; + private long migrateMaxBound = 1L; + + // Best scalar score found so far in current batch (lower is better) + private double bestScore; + // A supportive pruning threshold derived from the best plan's disk variance (max-min) + private long bestDiskVarianceThreshold; + + private static class MigrateOption { + protected final Boolean isMigration; + protected final int fromNodeId; + protected final int toNodeId; + + public MigrateOption(Boolean isMigration, int fromNodeId, int toNodeId) { + this.isMigration = isMigration; + this.fromNodeId = fromNodeId; + this.toNodeId = toNodeId; + } + } + + private void prepare( + Map availableDataNodeMap, + List allocatedRegionGroups, + List databaseAllocatedRegionGroups) { + + // Store the maximum DataNodeId + int maxDataNodeId = + Math.max( + availableDataNodeMap.keySet().stream().max(Integer::compareTo).orElse(0), + allocatedRegionGroups.stream() + .flatMap(regionGroup -> regionGroup.getDataNodeLocations().stream()) + .mapToInt(TDataNodeLocation::getDataNodeId) + .max() + .orElse(0)); + + // Compute regionCounter, databaseRegionCounter and combinationCounter + regionCounter = new int[maxDataNodeId + 1]; + Arrays.fill(regionCounter, 0); + diskCounter = new long[maxDataNodeId + 1]; + Arrays.fill(diskCounter, 0); + databaseRegionCounter = new int[maxDataNodeId + 1]; + Arrays.fill(databaseRegionCounter, 0); + combinationCounter = new int[maxDataNodeId + 1][maxDataNodeId + 1]; + for (int i = 0; i <= maxDataNodeId; i++) { + Arrays.fill(combinationCounter[i], 0); + } + for (TRegionReplicaSet regionReplicaSet : allocatedRegionGroups) { + List dataNodeLocations = regionReplicaSet.getDataNodeLocations(); + for (int i = 0; i < dataNodeLocations.size(); i++) { + regionCounter[dataNodeLocations.get(i).getDataNodeId()]++; + diskCounter[dataNodeLocations.get(i).getDataNodeId()] += + regionGroupStatisticsMap.get(regionReplicaSet.getRegionId()).getDiskUsage(); + for (int j = i + 1; j < dataNodeLocations.size(); j++) { + combinationCounter[dataNodeLocations.get(i).getDataNodeId()][ + dataNodeLocations.get(j).getDataNodeId()]++; + combinationCounter[dataNodeLocations.get(j).getDataNodeId()][ + dataNodeLocations.get(i).getDataNodeId()]++; + } + } + } + for (TRegionReplicaSet regionReplicaSet : databaseAllocatedRegionGroups) { + List dataNodeLocations = regionReplicaSet.getDataNodeLocations(); + for (TDataNodeLocation dataNodeLocation : dataNodeLocations) { + databaseRegionCounter[dataNodeLocation.getDataNodeId()]++; + } + } + + List dataNodeIdList = new ArrayList<>(availableDataNodeMap.keySet()); + dataNodeIdList.sort(Comparator.comparingLong(node -> diskCounter[node])); + availableToDataNodeSet = new HashSet<>(); + for (int i = 0; i < 1; i++) { + availableToDataNodeSet.add(dataNodeIdList.get(i)); + System.out.println("Available To Node: " + dataNodeIdList.get(i)); + } + availableFromDataNodeSet = new HashSet<>(); + for (int i = 1; i < dataNodeIdList.size(); i++) { + availableFromDataNodeSet.add(dataNodeIdList.get(i)); + System.out.println("Available From Node: " + dataNodeIdList.get(i)); + } + } + + @Override + public Map autoBalanceRegionReplicasDistribution( + Map availableDataNodeMap, + Map regionGroupStatisticsMap, + List allocatedRegionGroups, + int replicationFactor) { + this.regionGroupStatisticsMap = regionGroupStatisticsMap; + this.replicationFactor = replicationFactor; + this.replicaNodesIdMap = + allocatedRegionGroups.stream() + .collect( + Collectors.toMap( + TRegionReplicaSet::getRegionId, + regionReplicaSet -> + regionReplicaSet.getDataNodeLocations().stream() + .map(TDataNodeLocation::getDataNodeId) + .collect(Collectors.toList()))); + // 1. prepare: compute regionCounter, databaseRegionCounter, and combinationCounter + prepare(availableDataNodeMap, allocatedRegionGroups, Collections.emptyList()); + + // 2. Build allowed migration set for each region. + // No migration: 1 option. + // Migrate once: Select a source replica from ∈ ReplicaSet and a target node to ∈ Nodes \ + // ReplicaSet, for a total of r * (n - r) options. + // So the total options are r * (n - r) + 1. + List regionKeys = + allocatedRegionGroups.stream() + .map(TRegionReplicaSet::getRegionId) + .collect(Collectors.toList()); + allowedMigrateOptions = new HashMap<>(); + for (TConsensusGroupId regionId : regionKeys) { + List migrateOptions = new ArrayList<>(); + // No migration option + migrateOptions.add(new MigrateOption(false, -1, -1)); + // Migration options + List replicaNodeIds = replicaNodesIdMap.get(regionId); + for (Integer fromNodeId : replicaNodeIds) { + for (Integer toNodeId : availableDataNodeMap.keySet()) { + if (!replicaNodeIds.contains(toNodeId) + && availableFromDataNodeSet.contains(fromNodeId) + && availableToDataNodeSet.contains(toNodeId)) { + migrateOptions.add(new MigrateOption(true, fromNodeId, toNodeId)); + } + } + } + migrateOptions.sort( + Comparator.comparingLong( + option -> option.isMigration ? diskCounter[option.toNodeId] : Long.MAX_VALUE)); + allowedMigrateOptions.put(regionId, migrateOptions); + } + + // Sort regionKeys by the maximum current disk load among its involved replica nodes (desc) + regionKeys.sort( + (a, b) -> { + long maxA = + replicaNodesIdMap.get(a).stream() + .mapToLong(nodeId -> diskCounter[nodeId]) + .max() + .orElse(0L); + long maxB = + replicaNodesIdMap.get(b).stream() + .mapToLong(nodeId -> diskCounter[nodeId]) + .max() + .orElse(0L); + return Long.compare(maxB, maxA); + }); + + // 3. Batch DFS + Map result = new HashMap<>(); + + for (int start = 0; start < regionKeys.size(); start += BATCH_SIZE) { + dfsRegionKeys = regionKeys.subList(start, Math.min(start + BATCH_SIZE, regionKeys.size())); + int batchSize = dfsRegionKeys.size(); + + // currentAssignment holds the candidate option chosen for the region at that index + MigrateOption[] currentAssignment = new MigrateOption[batchSize]; + // Initialize buffer + bestAssignment = new MigrateOption[batchSize]; + Arrays.fill(bestAssignment, new MigrateOption(false, -1, -1)); + // Initialize batch-level normalization scales and best score + long initialDiskVariance = computeDiskVariance(); + long initialRegionVariance = computeRegionVariance(); + // Estimate an upper bound of migrate cost as the sum of all regions' disk usage within the + // batch + long migrateCostUpperBound = 0L; + for (int i = 0; i < batchSize; i++) { + migrateCostUpperBound += regionGroupStatisticsMap.get(dfsRegionKeys.get(i)).getDiskUsage(); + } + // Min-Max bounds: + // - Disk variance and region variance use max-min definition; conservative upper bound by + // adding 2*sum(batchDisk) and 2*batchSize respectively + diskVarMinBound = 0L; + diskVarMaxBound = initialDiskVariance + 2L * migrateCostUpperBound; + regionVarMinBound = 0L; + regionVarMaxBound = initialRegionVariance + 2L * batchSize; + // For migrate cost, use a more reasonable upper bound that relates to the potential + // balance improvement. If we can improve diskVariance by X, migrating X units should + // be considered acceptable. So we scale migrateCost relative to initialDiskVariance. + // This ensures that migration cost is normalized in a way that's comparable to balance + // improvement. + migrateMinBound = 0L; + // Use max of: batch cost upper bound, or a fraction of initial disk variance + // This makes migrateCost normalization more comparable to diskVariance improvement + migrateMaxBound = Math.max(migrateCostUpperBound, Math.max(1L, initialDiskVariance)); + // Scales (1 / (max - min)) + scaleDiskVariance = 1.0 / Math.max(1L, (diskVarMaxBound - diskVarMinBound)); + scaleRegionVariance = 1.0 / Math.max(1L, (regionVarMaxBound - regionVarMinBound)); + scaleMigrateCost = 1.0 / Math.max(1L, (migrateMaxBound - migrateMinBound)); + // Precompute scatter bounds (min and max possible deltas over the batch) + long tmpScatterMin = 0L; + long tmpScatterMax = 0L; + // We'll fill scatterDelta first, then compute per-index min/max across options below + + // pre-calculate scatterDelta + scatterDelta = new int[batchSize][]; + for (int i = 0; i < batchSize; i++) { + TConsensusGroupId regionId = dfsRegionKeys.get(i); + List replicaNodeIds = replicaNodesIdMap.get(regionId); + List options = allowedMigrateOptions.get(regionId); + int optionSize = options.size(); + scatterDelta[i] = new int[optionSize]; + scatterDelta[i][0] = 0; + for (int j = 0; j < optionSize; j++) { + MigrateOption option = options.get(j); + int fromNodeId = option.fromNodeId; + int toNodeId = option.toNodeId; + int newScatter = 0; + int currentScatter = 0; + if (option.isMigration) { + for (Integer replicaNodeId : replicaNodeIds) { + if (replicaNodeId == fromNodeId) { + continue; + } + newScatter += combinationCounter[replicaNodeId][toNodeId]; + currentScatter += combinationCounter[replicaNodeId][fromNodeId]; + } + } + scatterDelta[i][j] = newScatter - currentScatter; + } + // compute min/max for this index + int minAtI = Integer.MAX_VALUE; + int maxAtI = Integer.MIN_VALUE; + for (int v : scatterDelta[i]) { + if (v < minAtI) { + minAtI = v; + } + if (v > maxAtI) { + maxAtI = v; + } + } + tmpScatterMin += minAtI; + tmpScatterMax += maxAtI; + } + + scatterMinBound = tmpScatterMin; + scatterMaxBound = Math.max(tmpScatterMin + 1L, tmpScatterMax); // ensure range>=1 + scaleScatter = 1.0 / Math.max(1L, (scatterMaxBound - scatterMinBound)); + + bestScore = Double.POSITIVE_INFINITY; + bestDiskVarianceThreshold = Long.MAX_VALUE; + // Initialize bestMetrics with maximum values to represent "no solution found yet" + bestMetrics = new long[] {Long.MAX_VALUE, Long.MAX_VALUE, Long.MAX_VALUE, Long.MAX_VALUE}; + + dfsOptimalDistribution(0, 0, currentAssignment, 0); + + for (int i = 0; i < batchSize; i++) { + TConsensusGroupId regionId = dfsRegionKeys.get(i); + result.put(regionId, bestAssignment[i]); + + // update combinationCounter + MigrateOption option = bestAssignment[i]; + if (option.isMigration) { + List replicaNodeIds = replicaNodesIdMap.get(regionId); + for (Integer replicaNodeId : replicaNodeIds) { + if (replicaNodeId == option.fromNodeId) { + continue; + } + combinationCounter[replicaNodeId][option.fromNodeId]--; + combinationCounter[option.fromNodeId][replicaNodeId]--; + combinationCounter[replicaNodeId][option.toNodeId]++; + combinationCounter[option.toNodeId][replicaNodeId]++; + } + long regionDisk = regionGroupStatisticsMap.get(regionId).getDiskUsage(); + regionCounter[option.fromNodeId]--; + regionCounter[option.toNodeId]++; + databaseRegionCounter[option.fromNodeId]--; + databaseRegionCounter[option.toNodeId]++; + diskCounter[option.fromNodeId] -= regionDisk; + diskCounter[option.toNodeId] += regionDisk; + } + } + } + // 4. Construct the result + Map finalResult = new HashMap<>(); + for (TConsensusGroupId regionId : regionKeys) { + TRegionReplicaSet currentRegionReplicaSet = new TRegionReplicaSet(); + currentRegionReplicaSet.setRegionId(regionId); + MigrateOption option = result.get(regionId); + List replicaNodeIds = replicaNodesIdMap.get(regionId); + for (Integer nodeId : replicaNodeIds) { + if (option.isMigration && nodeId == option.fromNodeId) { + currentRegionReplicaSet.addToDataNodeLocations( + availableDataNodeMap.get(option.toNodeId).getLocation()); + } else { + currentRegionReplicaSet.addToDataNodeLocations( + availableDataNodeMap.get(nodeId).getLocation()); + } + } + finalResult.put(regionId, currentRegionReplicaSet); + } + + return finalResult; + } + + private void dfsOptimalDistribution( + int index, int currentScatter, MigrateOption[] currentAssignment, long migrateCost) { + long[] currentMetrics = evaluateCurrentAssignment(currentScatter, migrateCost); + double currentScore = computeScore(currentMetrics); + + if (index == dfsRegionKeys.size()) { + // Log the complete migration plan and scores + // logMigrationPlanAndScores(currentAssignment, currentMetrics, currentScore, bestScore); + + if (currentScore < bestScore) { + bestScore = currentScore; + bestDiskVarianceThreshold = currentMetrics[0]; + System.arraycopy(currentAssignment, 0, bestAssignment, 0, currentAssignment.length); + System.arraycopy(currentMetrics, 0, bestMetrics, 0, currentMetrics.length); + // LOGGER.info("✓ Selected this migration plan (new best score: {})", currentScore); + } else { + // LOGGER.info("✗ Did not select this migration plan (current score: {} >= best + // score: {})", currentScore, bestScore); + } + return; + } + // Try each candidate option for the region at the current index. + TConsensusGroupId regionId = dfsRegionKeys.get(index); + List options = allowedMigrateOptions.get(regionId); + // Explore options in a promising order: estimated next max disk load asc, then scatterDelta + // asc. + Integer[] orderedIndices = new Integer[options.size()]; + for (int i = 0; i < options.size(); i++) { + orderedIndices[i] = i; + } + final long regionDiskUsage = regionGroupStatisticsMap.get(regionId).getDiskUsage(); + Arrays.sort( + orderedIndices, + (i1, i2) -> { + MigrateOption o1 = options.get(i1); + MigrateOption o2 = options.get(i2); + long d1 = o1.isMigration ? diskCounter[o1.toNodeId] : Long.MAX_VALUE; + long d2 = o2.isMigration ? diskCounter[o2.toNodeId] : Long.MAX_VALUE; + if (d1 != d2) { + return Long.compare(d1, d2); + } + long r1 = o1.isMigration ? regionCounter[o1.toNodeId] : Integer.MAX_VALUE; + long r2 = o2.isMigration ? regionCounter[o2.toNodeId] : Integer.MAX_VALUE; + return Long.compare(r1, r2); + }); + int beamLimit = Math.min(BATCH_SIZE, orderedIndices.length); + for (int k = 0; k < beamLimit; k++) { + int optionIndex = orderedIndices[k]; + MigrateOption option = options.get(optionIndex); + // prune by estimated next max disk load upper bound + long estimatedNextMax = estimateNextMaxDiskLoad(option, regionDiskUsage); + if (estimatedNextMax > bestDiskVarianceThreshold) { + // continue; + } + currentAssignment[index] = option; + long regionDisk = + option.isMigration ? regionGroupStatisticsMap.get(regionId).getDiskUsage() : 0; + // Update counters + if (option.isMigration) { + regionCounter[option.fromNodeId]--; + regionCounter[option.toNodeId]++; + databaseRegionCounter[option.fromNodeId]--; + databaseRegionCounter[option.toNodeId]++; + diskCounter[option.fromNodeId] -= regionDisk; + diskCounter[option.toNodeId] += regionDisk; + } + // Update scatter + int newScatter = currentScatter + scatterDelta[index][optionIndex]; + long nextMigrateCost = migrateCost + regionDisk; + // Recurse + dfsOptimalDistribution(index + 1, newScatter, currentAssignment, nextMigrateCost); + // Backtrack: restore counters + if (option.isMigration) { + regionCounter[option.fromNodeId]++; + regionCounter[option.toNodeId]--; + databaseRegionCounter[option.fromNodeId]++; + databaseRegionCounter[option.toNodeId]--; + diskCounter[option.fromNodeId] += regionDisk; + diskCounter[option.toNodeId] -= regionDisk; + } + } + } + + private long[] evaluateCurrentAssignment(int currentScatter, long migrateCost) { + long diskVariance = computeDiskVariance(); + long regionVariance = computeRegionVariance(); + return new long[] {diskVariance, regionVariance, migrateCost, currentScatter}; + } + + private double computeScore(long[] metrics) { + // metrics: [diskVariance, regionVariance, migrateCost, currentScatter] + // Min-Max normalization: (X - X_min) / (X_max - X_min) + double normDisk = + (Math.min(diskVarMaxBound, Math.max(diskVarMinBound, metrics[0])) - diskVarMinBound) + * scaleDiskVariance; + double normRegion = + (Math.min(regionVarMaxBound, Math.max(regionVarMinBound, metrics[1])) - regionVarMinBound) + * scaleRegionVariance; + double normMigrate = + (Math.min(migrateMaxBound, Math.max(migrateMinBound, metrics[2])) - migrateMinBound) + * scaleMigrateCost; + // scatter is to be maximized, do min-max normalization over estimated bounds + double normScatter = + (Math.min(scatterMaxBound, Math.max(scatterMinBound, metrics[3])) - scatterMinBound) + * scaleScatter; + // Weighted sum: minimize (disk + region + migrate) - scatter + return weightDiskVariance * normDisk + + weightRegionVariance * normRegion + + weightMigrateCost * normMigrate + - weightScatter * normScatter; + } + + private void logMigrationPlanAndScores( + MigrateOption[] assignment, long[] metrics, double totalScore, double bestScore) { + // Build migration plan description + StringBuilder planBuilder = new StringBuilder(); + int migrationCount = 0; + for (int i = 0; i < assignment.length; i++) { + MigrateOption option = assignment[i]; + if (option.isMigration) { + if (migrationCount > 0) { + planBuilder.append(", "); + } + planBuilder + .append("Region") + .append(dfsRegionKeys.get(i).getId()) + .append(": ") + .append(option.fromNodeId) + .append("->") + .append(option.toNodeId); + migrationCount++; + } + } + + if (dfsRegionKeys.get(0).getId() >= 8) { + return; + } + + String plan = migrationCount == 0 ? "No migration" : planBuilder.toString(); + + // Calculate normalized and weighted scores + double normDisk = + (Math.min(diskVarMaxBound, Math.max(diskVarMinBound, metrics[0])) - diskVarMinBound) + * scaleDiskVariance; + double normRegion = + (Math.min(regionVarMaxBound, Math.max(regionVarMinBound, metrics[1])) - regionVarMinBound) + * scaleRegionVariance; + double normMigrate = + (Math.min(migrateMaxBound, Math.max(migrateMinBound, metrics[2])) - migrateMinBound) + * scaleMigrateCost; + double normScatter = + (Math.min(scatterMaxBound, Math.max(scatterMinBound, metrics[3])) - scatterMinBound) + * scaleScatter; + + double weightedDisk = weightDiskVariance * normDisk; + double weightedRegion = weightRegionVariance * normRegion; + double weightedMigrate = weightMigrateCost * normMigrate; + double weightedScatter = -weightScatter * normScatter; // Note: scatter is subtracted + + LOGGER.info( + "=== Migration Plan Evaluation ===\n" + + " Migration plan: {}\n" + + " Raw metrics: diskVariance={}, regionVariance={}, migrateCost={}, scatter={}\n" + + " Normalized values: normDisk={}, normRegion={}, normMigrate={}, normScatter={}\n" + + " Weighted scores: disk={} (weight={}), region={} (weight={}), migrate={} (weight={}), scatter={} (weight={})\n" + + " Total score: {} (current best: {})", + plan, + metrics[0], + metrics[1], + metrics[2], + metrics[3], + String.format("%.4f", normDisk), + String.format("%.4f", normRegion), + String.format("%.4f", normMigrate), + String.format("%.4f", normScatter), + String.format("%.4f", weightedDisk), + weightDiskVariance, + String.format("%.4f", weightedRegion), + weightRegionVariance, + String.format("%.4f", weightedMigrate), + weightMigrateCost, + String.format("%.4f", weightedScatter), + weightScatter, + String.format("%.4f", totalScore), + bestScore == Double.POSITIVE_INFINITY ? "∞" : String.format("%.4f", bestScore)); + } + + private long computeDiskVariance() { + // Consider all available nodes (both from and to sets), not just nodes with usage > 0 + // This ensures we correctly calculate variance even when some nodes have 0 usage + Set allAvailableNodes = new HashSet<>(); + allAvailableNodes.addAll(availableFromDataNodeSet); + allAvailableNodes.addAll(availableToDataNodeSet); + + if (allAvailableNodes.isEmpty()) { + return 0L; + } + + // Calculate mean + double sum = 0.0; + int count = 0; + for (Integer nodeId : allAvailableNodes) { + long value = (nodeId < diskCounter.length) ? diskCounter[nodeId] : 0L; + sum += value; + count++; + } + if (count == 0) { + return 0L; + } + double mean = sum / count; + + // Calculate variance: Σ(xi - mean)² / n + double sq = 0.0; + for (Integer nodeId : allAvailableNodes) { + long value = (nodeId < diskCounter.length) ? diskCounter[nodeId] : 0L; + double diff = value - mean; + sq += diff * diff; + } + double variance = sq / count; + return Math.round(variance); + } + + private long computeRegionVariance() { + // Consider all available nodes (both from and to sets), not just nodes with regions > 0 + // This ensures we correctly calculate variance even when some nodes have 0 regions + Set allAvailableNodes = new HashSet<>(); + allAvailableNodes.addAll(availableFromDataNodeSet); + allAvailableNodes.addAll(availableToDataNodeSet); + + if (allAvailableNodes.isEmpty()) { + return 0L; + } + + // Calculate mean + double sum = 0.0; + int count = 0; + for (Integer nodeId : allAvailableNodes) { + int value = (nodeId < regionCounter.length) ? regionCounter[nodeId] : 0; + sum += value; + count++; + } + if (count == 0) { + return 0L; + } + double mean = sum / count; + + // Calculate variance: Σ(xi - mean)² / n + double sq = 0.0; + for (Integer nodeId : allAvailableNodes) { + int value = (nodeId < regionCounter.length) ? regionCounter[nodeId] : 0; + double diff = value - mean; + sq += diff * diff; + } + double variance = sq / count; + return Math.round(variance); + } + + private long estimateNextMaxDiskLoad(MigrateOption option, long regionDiskUsage) { + long currentMax = Arrays.stream(diskCounter).max().orElse(0L); + if (!option.isMigration) { + return currentMax; + } + long toAfter = diskCounter[option.toNodeId] + regionDiskUsage; + // Conservative upper bound: new maximum cannot be less than max(currentMax, toAfter) + return Math.max(currentMax, toAfter); + } +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/migrator/IRegionGroupMigrator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/migrator/IRegionGroupMigrator.java new file mode 100644 index 000000000000..f191f012244c --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/migrator/IRegionGroupMigrator.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.manager.load.balancer.region.migrator; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.confignode.manager.load.cache.region.RegionGroupStatistics; + +import java.util.List; +import java.util.Map; + +public interface IRegionGroupMigrator { + + /** + * Auto balance the RegionReplicas' distribution for cluster RegionGroups. + * + * @param availableDataNodeMap DataNodes that can be used for allocation + * @param regionGroupStatisticsMap Statistics of RegionGroups + * @param allocatedRegionGroups Allocated RegionGroups + * @param replicationFactor Replication factor of TRegionReplicaSet + * @return The optimal TRegionReplicaSet derived by the specified algorithm + */ + Map autoBalanceRegionReplicasDistribution( + Map availableDataNodeMap, + Map regionGroupStatisticsMap, + List allocatedRegionGroups, + int replicationFactor); +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java index a374c923f240..5229c53b7ac1 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java @@ -35,6 +35,7 @@ */ public class RegionCache extends AbstractLoadCache { private final Pair id; + private long lastDiskUsage = 0L; public RegionCache(int dataNodeId, TConsensusGroupId gid) { super(); @@ -51,17 +52,24 @@ public synchronized void updateCurrentStatistics(boolean forceUpdate) { history = Collections.unmodifiableList(slidingWindow); RegionStatus status; + long diskUsage; long currentNanoTime = System.nanoTime(); if (lastSample == null) { /* First heartbeat not received from this region, status is UNKNOWN */ status = RegionStatus.Unknown; + diskUsage = 0; } else if (!failureDetector.isAvailable(id, history)) { /* Failure detector decides that this region is UNKNOWN */ status = RegionStatus.Unknown; + diskUsage = lastSample.getDiskUsage(); } else { status = lastSample.getStatus(); + diskUsage = lastSample.getDiskUsage(); } - this.currentStatistics.set(new RegionStatistics(currentNanoTime, status)); + if (diskUsage != 0L) { + lastDiskUsage = diskUsage; + } + this.currentStatistics.set(new RegionStatistics(currentNanoTime, status, lastDiskUsage)); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java index e441054b1dfb..11fcff9bd9b0 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java @@ -108,9 +108,15 @@ public void updateCurrentStatistics() { TreeMap::new, (map, entry) -> map.put(entry.getKey(), entry.getValue().getCurrentStatistics()), TreeMap::putAll); - currentStatistics.set( + RegionGroupStatistics regionGroupStatistics = new RegionGroupStatistics( - caculateRegionGroupStatus(regionStatisticsMap), regionStatisticsMap)); + caculateRegionGroupStatus(regionStatisticsMap), regionStatisticsMap); + long diskUsage = 0; + for (RegionStatistics regionStatistics : regionStatisticsMap.values()) { + diskUsage += regionStatistics.getDiskUsage(); + } + regionGroupStatistics.setDiskUsage(diskUsage); + currentStatistics.set(regionGroupStatistics); } private RegionGroupStatus caculateRegionGroupStatus( diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupStatistics.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupStatistics.java index c8d262790dd9..1ffeea4f9a84 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupStatistics.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupStatistics.java @@ -33,6 +33,7 @@ public class RegionGroupStatistics { private final RegionGroupStatus regionGroupStatus; private final Map regionStatisticsMap; + private long diskUsage = 0; public RegionGroupStatistics( RegionGroupStatus regionGroupStatus, Map regionStatisticsMap) { @@ -48,6 +49,14 @@ public RegionGroupStatus getRegionGroupStatus() { return regionGroupStatus; } + public long getDiskUsage() { + return diskUsage; + } + + public void setDiskUsage(long diskUsage) { + this.diskUsage = diskUsage; + } + /** * Get the specified Region's status. * diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionHeartbeatSample.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionHeartbeatSample.java index fdce5e71def4..636b0eb7ab73 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionHeartbeatSample.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionHeartbeatSample.java @@ -27,10 +27,12 @@ public class RegionHeartbeatSample extends AbstractHeartbeatSample { private final RegionStatus status; + private long diskUsage = 0; public RegionHeartbeatSample(long sampleNanoTimestamp, RegionStatus status) { super(sampleNanoTimestamp); this.status = status; + this.diskUsage = 0; } @TestOnly @@ -42,4 +44,12 @@ public RegionHeartbeatSample(RegionStatus status) { public RegionStatus getStatus() { return status; } + + public void setDiskUsage(long diskUsage) { + this.diskUsage = diskUsage; + } + + public long getDiskUsage() { + return diskUsage; + } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionStatistics.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionStatistics.java index 84f22ef4ac57..a2b2e1a4ef05 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionStatistics.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionStatistics.java @@ -29,26 +29,33 @@ public class RegionStatistics extends AbstractStatistics { private final RegionStatus regionStatus; + private final long diskUsage; - public RegionStatistics(long statisticsNanoTimestamp, RegionStatus regionStatus) { + public RegionStatistics(long statisticsNanoTimestamp, RegionStatus regionStatus, long diskUsage) { super(statisticsNanoTimestamp); this.regionStatus = regionStatus; + this.diskUsage = diskUsage; } @TestOnly public RegionStatistics(RegionStatus regionStatus) { super(System.nanoTime()); this.regionStatus = regionStatus; + this.diskUsage = 0; } public static RegionStatistics generateDefaultRegionStatistics() { - return new RegionStatistics(0, RegionStatus.Unknown); + return new RegionStatistics(0, RegionStatus.Unknown, 0); } public RegionStatus getRegionStatus() { return regionStatus; } + public long getDiskUsage() { + return diskUsage; + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java index 59ce7352312f..c4bc2e348852 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java @@ -1462,4 +1462,9 @@ public TSStatus createTableView(final TCreateTableViewReq req) { public TSStatus pushHeartbeat(final int dataNodeId, final TPipeHeartbeatResp resp) { return configManager.pushHeartbeat(dataNodeId, resp); } + + @Override + public TSStatus balanceRegions() { + return configManager.balanceRegions(); + } } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/migrator/GreedyCopySetRegionGroupMigratorTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/migrator/GreedyCopySetRegionGroupMigratorTest.java new file mode 100644 index 000000000000..475a2407e03d --- /dev/null +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/migrator/GreedyCopySetRegionGroupMigratorTest.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.manager.load.balancer.region.migrator; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; +import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.confignode.manager.load.balancer.region.PartiteGraphPlacementRegionGroupAllocator; +import org.apache.iotdb.confignode.manager.load.cache.region.RegionGroupStatistics; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; + +@RunWith(Parameterized.class) +public class GreedyCopySetRegionGroupMigratorTest { + + private static final Logger LOGGER = + LoggerFactory.getLogger(GreedyCopySetRegionGroupMigratorTest.class); + + private static final PartiteGraphPlacementRegionGroupAllocator ALLOCATOR = + new PartiteGraphPlacementRegionGroupAllocator(); + private static final GreedyCopySetRegionGroupMigrator MIGRATOR = + new GreedyCopySetRegionGroupMigrator(); + + // Test parameters + private final int nodeCount; // n - number of nodes + private final int regionCount; // |R| - number of regions + private final int replicaCount; // r - replication factor + + // Instance variables to avoid interference between tests + private final Map beforeNodeMap = new TreeMap<>(); + private final Map availableDataNodeMap = new TreeMap<>(); + private final Map beforeSpaceMap = new TreeMap<>(); + private final Map freeSpaceMap = new TreeMap<>(); + + @Parameterized.Parameters(name = "n={0}, |R|={1}, r={2}") + public static Collection data() { + return Arrays.asList( + new Object[][] { + {3, 9, 2}, // n=3, |R|=9, r=2, dateRegionPerNode=6 + {4, 12, 3}, // n=3, |R|=12, r=3, dateRegionPerNode=9 + {6, 30, 5}, // n=6, |R|=30, r=5, dateRegionPerNode=25 + {10, 100, 2}, // n=10, |R|=100, r=2, dateRegionPerNode=2 + {10, 100, 3}, // n=10, |R|=100, r=3, dateRegionPerNode=30 + {10, 100, 5}, // n=10, |R|=100, r=5, dateRegionPerNode=50 + {100, 500, 2}, // n=100, |R|=500, r=2, dateRegionPerNode=10 + {100, 500, 3}, // n=100, |R|=500, r=3, dateRegionPerNode=15 + }); + } + + public GreedyCopySetRegionGroupMigratorTest(int nodeCount, int regionCount, int replicaCount) { + this.nodeCount = nodeCount; + this.regionCount = regionCount; + this.replicaCount = replicaCount; + } + + @Test + public void autoBalanceRegionReplicasDistribution() { + LOGGER.info( + "Starting test: nodeCount(n)={}, regionCount(|R|)={}, replicaCount(r)={}", + nodeCount, + regionCount, + replicaCount); + + Random random = new Random(); + beforeNodeMap.clear(); + availableDataNodeMap.clear(); + beforeSpaceMap.clear(); + freeSpaceMap.clear(); + + // Initialize nodes: the first n-1 nodes exist before migration + for (int i = 1; i <= nodeCount - 1; i++) { + TDataNodeConfiguration dataNodeConfiguration = + new TDataNodeConfiguration().setLocation(new TDataNodeLocation().setDataNodeId(i)); + double space = random.nextDouble(); + beforeNodeMap.put(i, dataNodeConfiguration); + availableDataNodeMap.put(i, dataNodeConfiguration); + beforeSpaceMap.put(i, space); + freeSpaceMap.put(i, space); + } + // The nth node is a newly added node + for (int i = nodeCount; i <= nodeCount; i++) { + availableDataNodeMap.put( + i, new TDataNodeConfiguration().setLocation(new TDataNodeLocation().setDataNodeId(i))); + freeSpaceMap.put(i, random.nextDouble()); + } + + // Generate initial region allocation + List allocatedResult = new ArrayList<>(); + for (int i = 0; i < regionCount; i++) { + allocatedResult.add( + ALLOCATOR.generateOptimalRegionReplicasDistribution( + beforeNodeMap, + beforeSpaceMap, + allocatedResult, + allocatedResult, + replicaCount, + new TConsensusGroupId(TConsensusGroupType.DataRegion, i))); + } + + // Create mock statistics + Map fakeStatisticsMap = new TreeMap<>(); + allocatedResult.forEach( + regionGroup -> { + RegionGroupStatistics fakeStatistics = + RegionGroupStatistics.generateDefaultRegionGroupStatistics(); + fakeStatistics.setDiskUsage(100); + fakeStatisticsMap.put(regionGroup.getRegionId(), fakeStatistics); + }); + + // Record statistics before migration + Map beforeRegionCounter = new TreeMap<>(); + Map beforeDiskCounter = new TreeMap<>(); + for (TRegionReplicaSet regionReplicaSet : allocatedResult) { + regionReplicaSet + .getDataNodeLocations() + .forEach( + location -> { + beforeRegionCounter.merge(location.getDataNodeId(), 1, Integer::sum); + beforeDiskCounter.merge( + location.getDataNodeId(), + fakeStatisticsMap.get(regionReplicaSet.regionId).getDiskUsage(), + Long::sum); + }); + } + LOGGER.info("Region count before migration: {}", beforeRegionCounter); + LOGGER.info("Disk count before migration: {}", beforeDiskCounter); + + // Execute migration + Map migrationPlan = + MIGRATOR.autoBalanceRegionReplicasDistribution( + availableDataNodeMap, fakeStatisticsMap, allocatedResult, replicaCount); + + // Calculate migration cost + long migrationCost = 0; + for (TRegionReplicaSet regionReplicaSet : allocatedResult) { + Set originSet = + regionReplicaSet.getDataNodeLocations().stream() + .map(TDataNodeLocation::getDataNodeId) + .collect(Collectors.toSet()); + Set migrationSet = + migrationPlan.get(regionReplicaSet.getRegionId()).getDataNodeLocations().stream() + .map(TDataNodeLocation::getDataNodeId) + .collect(Collectors.toSet()); + LOGGER.info( + "Region ID: {}, original nodes: {}, target nodes: {}", + regionReplicaSet.getRegionId().getId(), + originSet, + migrationSet); + migrationSet.removeAll(originSet); + migrationCost += + migrationSet.size() * fakeStatisticsMap.get(regionReplicaSet.regionId).getDiskUsage(); + } + LOGGER.info("Migration cost: {}", migrationCost); + + // Record statistics after migration + Map afterRegionCounter = new TreeMap<>(); + Map afterDiskCounter = new TreeMap<>(); + for (TRegionReplicaSet regionReplicaSet : migrationPlan.values()) { + regionReplicaSet + .getDataNodeLocations() + .forEach( + location -> { + afterRegionCounter.merge(location.getDataNodeId(), 1, Integer::sum); + afterDiskCounter.merge( + location.getDataNodeId(), + fakeStatisticsMap.get(regionReplicaSet.regionId).getDiskUsage(), + Long::sum); + }); + } + LOGGER.info("Region count after migration: {}", afterRegionCounter); + LOGGER.info("Disk count after migration: {}", afterDiskCounter); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java index 2c037cf0f3e5..32be7e4a4b55 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java @@ -1303,6 +1303,12 @@ public TSStatus migrateRegion(TMigrateRegionReq req) throws TException { () -> client.migrateRegion(req), status -> !updateConfigNodeLeader(status)); } + @Override + public TSStatus balanceRegions() throws TException { + return executeRemoteCallWithRetry( + () -> client.balanceRegions(), status -> !updateConfigNodeLeader(status)); + } + @Override public TSStatus reconstructRegion(TReconstructRegionReq req) throws TException { return executeRemoteCallWithRetry( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java index cb4d05f1b99a..180f0329028a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java @@ -31,6 +31,7 @@ import org.apache.iotdb.db.exception.sql.SemanticException; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.AlterEncodingCompressorTask; +import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.BalanceRegionsTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.CountDatabaseTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.CountTimeSlotListTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.CreateContinuousQueryTask; @@ -126,6 +127,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.StatementNode; import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor; import org.apache.iotdb.db.queryengine.plan.statement.metadata.AlterEncodingCompressorStatement; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.BalanceRegionsStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.CountDatabaseStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.CountTimeSlotListStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateContinuousQueryStatement; @@ -772,6 +774,12 @@ public IConfigTask visitMigrateRegion( return new MigrateRegionTask(migrateRegionStatement); } + @Override + public IConfigTask visitBalanceRegions( + BalanceRegionsStatement balanceRegionsStatement, MPPQueryContext context) { + return new BalanceRegionsTask(balanceRegionsStatement); + } + @Override public IConfigTask visitReconstructRegion( ReconstructRegionStatement reconstructRegionStatement, MPPQueryContext context) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index fbda43909092..0f65fc2b4b0b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -3302,6 +3302,24 @@ public SettableFuture migrateRegion(final MigrateRegionTask mi return future; } + @Override + public SettableFuture balanceRegions() { + final SettableFuture future = SettableFuture.create(); + try (ConfigNodeClient configNodeClient = + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + final TSStatus status = configNodeClient.balanceRegions(); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + future.setException(new IoTDBException(status.message, status.code)); + return future; + } else { + future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); + } + } catch (Exception e) { + future.setException(e); + } + return future; + } + @Override public SettableFuture removeDataNode( final RemoveDataNodeStatement removeDataNodeStatement) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java index 85455756b052..2406e3bae4b6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java @@ -272,6 +272,8 @@ SettableFuture countTimeSlotList( SettableFuture migrateRegion(MigrateRegionTask migrateRegionTask); + SettableFuture balanceRegions(); + SettableFuture reconstructRegion(ReconstructRegionTask reconstructRegionTask); SettableFuture extendRegion(ExtendRegionTask extendRegionTask); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/BalanceRegionsTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/BalanceRegionsTask.java new file mode 100644 index 000000000000..22a918d8df50 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/BalanceRegionsTask.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.execution.config.metadata; + +import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult; +import org.apache.iotdb.db.queryengine.plan.execution.config.IConfigTask; +import org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.BalanceRegionsStatement; + +import com.google.common.util.concurrent.ListenableFuture; + +public class BalanceRegionsTask implements IConfigTask { + + protected final BalanceRegionsStatement statement; + + public BalanceRegionsTask(BalanceRegionsStatement balanceRegionsStatement) { + this.statement = balanceRegionsStatement; + } + + @Override + public ListenableFuture execute(IConfigTaskExecutor configTaskExecutor) + throws InterruptedException { + return configTaskExecutor.balanceRegions(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java index 7f3936af5115..51756f350b34 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java @@ -146,6 +146,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.literal.StringLiteral; import org.apache.iotdb.db.queryengine.plan.statement.metadata.AlterEncodingCompressorStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.AlterTimeSeriesStatement; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.BalanceRegionsStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.CountDatabaseStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.CountDevicesStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.CountLevelTimeSeriesStatement; @@ -4433,6 +4434,11 @@ public Statement visitMigrateRegion(IoTDBSqlParser.MigrateRegionContext ctx) { Integer.parseInt(ctx.toId.getText())); } + @Override + public Statement visitBalanceRegions(IoTDBSqlParser.BalanceRegionsContext ctx) { + return new BalanceRegionsStatement(); + } + @Override public Statement visitReconstructRegion(IoTDBSqlParser.ReconstructRegionContext ctx) { int dataNodeId = Integer.parseInt(ctx.targetDataNodeId.getText()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/security/TreeAccessCheckVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/security/TreeAccessCheckVisitor.java index 89daafccfbbc..e4ad85196ad0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/security/TreeAccessCheckVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/security/TreeAccessCheckVisitor.java @@ -47,6 +47,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.internal.InternalCreateTimeSeriesStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.AlterEncodingCompressorStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.AlterTimeSeriesStatement; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.BalanceRegionsStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.CountDatabaseStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.CountDevicesStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.CountLevelTimeSeriesStatement; @@ -1610,6 +1611,12 @@ public TSStatus visitMigrateRegion( return checkGlobalAuth(context, PrivilegeType.MAINTAIN, () -> ""); } + @Override + public TSStatus visitBalanceRegions( + BalanceRegionsStatement statement, TreeAccessCheckContext context) { + return checkGlobalAuth(context, PrivilegeType.MAINTAIN, () -> ""); + } + @Override public TSStatus visitReconstructRegion( ReconstructRegionStatement statement, TreeAccessCheckContext context) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java index eb38d9ba3902..5bf7d2424c3c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java @@ -36,6 +36,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.internal.SeriesSchemaFetchStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.AlterEncodingCompressorStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.AlterTimeSeriesStatement; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.BalanceRegionsStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.CountDatabaseStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.CountDevicesStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.CountLevelTimeSeriesStatement; @@ -635,6 +636,10 @@ public R visitMigrateRegion(MigrateRegionStatement migrateRegionStatement, C con return visitStatement(migrateRegionStatement, context); } + public R visitBalanceRegions(BalanceRegionsStatement balanceRegionsStatement, C context) { + return visitStatement(balanceRegionsStatement, context); + } + public R visitReconstructRegion( ReconstructRegionStatement reconstructRegionStatement, C context) { return visitStatement(reconstructRegionStatement, context); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/BalanceRegionsStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/BalanceRegionsStatement.java new file mode 100644 index 000000000000..49e9bef03778 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/BalanceRegionsStatement.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.statement.metadata; + +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.queryengine.plan.analyze.QueryType; +import org.apache.iotdb.db.queryengine.plan.statement.IConfigStatement; +import org.apache.iotdb.db.queryengine.plan.statement.Statement; +import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor; + +import java.util.Collections; +import java.util.List; + +public class BalanceRegionsStatement extends Statement implements IConfigStatement { + + public BalanceRegionsStatement() { + super(); + } + + @Override + public R accept(StatementVisitor visitor, C context) { + return visitor.visitBalanceRegions(this, context); + } + + @Override + public QueryType getQueryType() { + return QueryType.WRITE; + } + + @Override + public List getPaths() { + return Collections.emptyList(); + } +} diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift index d8f6318063eb..5a60955376f9 100644 --- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift +++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift @@ -1764,6 +1764,9 @@ service IConfigNodeRPCService { /** Migrate a region replica from one dataNode to another */ common.TSStatus migrateRegion(TMigrateRegionReq req) + /** Auto balance regions */ + common.TSStatus balanceRegions() + common.TSStatus reconstructRegion(TReconstructRegionReq req) common.TSStatus extendRegion(TExtendRegionReq req) From a14381950190d3213a3dc20d10d4b00a84c14146 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hu Date: Mon, 8 Dec 2025 13:34:39 +0800 Subject: [PATCH 2/2] load blance IT & show migrations --- .../it/load/IoTDBLoadBalanceIT.java | 330 ++++++++++++++++++ .../apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 | 7 +- .../org/apache/iotdb/db/qp/sql/SqlLexer.g4 | 4 + .../confignode/manager/ConfigManager.java | 16 + .../confignode/manager/ProcedureManager.java | 34 ++ .../GreedyCopySetRegionGroupMigrator.java | 35 +- .../impl/region/RegionMigrateProcedure.java | 13 + .../thrift/ConfigNodeRPCServiceProcessor.java | 7 + .../db/protocol/client/ConfigNodeClient.java | 8 + .../common/header/DatasetHeaderFactory.java | 4 + .../config/TreeConfigTaskVisitor.java | 8 + .../executor/ClusterConfigTaskExecutor.java | 29 ++ .../config/executor/IConfigTaskExecutor.java | 4 + .../config/metadata/ShowMigrationsTask.java | 111 ++++++ .../queryengine/plan/parser/ASTVisitor.java | 8 + .../security/TreeAccessCheckVisitor.java | 7 + .../plan/statement/StatementVisitor.java | 5 + .../metadata/ShowMigrationsStatement.java | 39 +++ .../schema/column/ColumnHeaderConstant.java | 23 ++ .../src/main/thrift/confignode.thrift | 28 ++ 20 files changed, 718 insertions(+), 2 deletions(-) create mode 100644 integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBLoadBalanceIT.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowMigrationsTask.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/ShowMigrationsStatement.java diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBLoadBalanceIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBLoadBalanceIT.java new file mode 100644 index 000000000000..fd23173d8c6b --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBLoadBalanceIT.java @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.it.load; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; +import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; +import org.apache.iotdb.commons.pipe.config.constant.SystemConstant; +import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils; +import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq; +import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp; +import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema; +import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq; +import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp; +import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList; +import org.apache.iotdb.consensus.ConsensusFactory; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.ClusterIT; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.awaitility.Awaitility; +import org.awaitility.core.ConditionTimeoutException; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.Statement; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework.getDataRegionMap; +import static org.apache.iotdb.util.MagicUtils.makeItCloseQuietly; + +@RunWith(IoTDBTestRunner.class) +@Category({ClusterIT.class}) +public class IoTDBLoadBalanceIT { + + private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBLoadBalanceIT.class); + + private static final int INITIAL_DATA_NODE_NUM = 3; + private static final int FINAL_DATA_NODE_NUM = 4; + private static final int TEST_REPLICATION_FACTOR = 2; + private static final String DATABASE = "root.db"; + private static final int TEST_DATABASE_NUM = 2; + private static final long TEST_TIME_PARTITION_INTERVAL = 604800000; + private static final int TEST_MIN_DATA_REGION_GROUP_NUM = 3; + private static final String LOAD_BALANCE_SQL = "LOAD BALANCE"; + + @Before + public void setUp() throws Exception { + EnvFactory.getEnv() + .getConfig() + .getCommonConfig() + .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS) + .setSchemaReplicationFactor(TEST_REPLICATION_FACTOR) + .setDataReplicationFactor(TEST_REPLICATION_FACTOR) + .setDataRegionGroupExtensionPolicy("CUSTOM") + .setDefaultDataRegionGroupNumPerDatabase(TEST_MIN_DATA_REGION_GROUP_NUM) + .setTimePartitionInterval(TEST_TIME_PARTITION_INTERVAL); + // Init 1C3D environment + EnvFactory.getEnv().initClusterEnvironment(1, INITIAL_DATA_NODE_NUM); + } + + @After + public void tearDown() { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @Test + public void testRegionLoadBalanceAfterAddingDataNode() throws Exception { + final int retryNum = 100; + + try (SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) { + + // Create databases and partitions to generate multiple regions + createDatabasesAndPartitions(client); + + // Check initial region distribution + Map beforeRegionCounter = getRegionDistribution(client); + LOGGER.info("Region distribution before migration: {}", beforeRegionCounter); + + // Verify initial distribution is not balanced (should have variance) + int maxBefore = beforeRegionCounter.values().stream().max(Integer::compareTo).orElse(0); + int minBefore = beforeRegionCounter.values().stream().min(Integer::compareTo).orElse(0); + LOGGER.info( + "Max Region count before migration: {}, Min Region count: {}", maxBefore, minBefore); + + // Add a new data node to trigger load balance + // Let DataNode register itself during startup instead of pre-registering via RPC + // Pre-registration causes endpoint conflict when DataNode tries to register during startup + EnvFactory.getEnv().registerNewDataNode(true); + LOGGER.info("New data node started successfully"); + + // Manually trigger load balance using SQL statement + try (Connection connection = makeItCloseQuietly(EnvFactory.getEnv().getConnection()); + Statement statement = makeItCloseQuietly(connection.createStatement())) { + statement.execute(LOAD_BALANCE_SQL); + LOGGER.info("Load balance triggered successfully"); + } + + // Wait for migration to complete + try { + awaitMigrationComplete(); + } catch (ConditionTimeoutException e) { + LOGGER.error("Region migration did not complete in time", e); + Assert.fail(); + } + + // Wait for load balance to complete + Map afterRegionCounter = null; + boolean isBalanced = false; + for (int retry = 0; retry < retryNum; retry++) { + afterRegionCounter = getRegionDistribution(client); + int maxAfter = afterRegionCounter.values().stream().max(Integer::compareTo).orElse(0); + int minAfter = afterRegionCounter.values().stream().min(Integer::compareTo).orElse(0); + int variance = maxAfter - minAfter; + + LOGGER.info( + "Retry {}: Region distribution after migration: {}, Max: {}, Min: {}, Variance: {}", + retry, + afterRegionCounter, + maxAfter, + minAfter, + variance); + + // Check if distribution is balanced (variance should be <= 1) + // Also verify that the new node has some regions + if (variance <= 1 && afterRegionCounter.size() == FINAL_DATA_NODE_NUM) { + isBalanced = true; + break; + } + + TimeUnit.SECONDS.sleep(2); + } + + Assert.assertTrue("Load balance not completed", isBalanced); + Assert.assertNotNull("Cannot get Region distribution after migration", afterRegionCounter); + LOGGER.info("Region distribution after migration: {}", afterRegionCounter); + + // Verify the new data node has regions + // The new node should have DataNodeId = 3 (since we started with 3 nodes: 0, 1, 2) + // DataNodeId starts from 0, so with INITIAL_DATA_NODE_NUM=3, IDs are 0,1,2, and new one is 3 + int newDataNodeId = INITIAL_DATA_NODE_NUM; + Assert.assertTrue( + "New data node should contain Regions", + afterRegionCounter.containsKey(newDataNodeId) + && afterRegionCounter.get(newDataNodeId) > 0); + + // Verify distribution is more balanced + int maxAfter = afterRegionCounter.values().stream().max(Integer::compareTo).orElse(0); + int minAfter = afterRegionCounter.values().stream().min(Integer::compareTo).orElse(0); + Assert.assertTrue( + "After load balance, the difference between max and min Region count should <= 1", + maxAfter - minAfter <= 1); + } + + // Also verify using SQL query + try (Connection connection = makeItCloseQuietly(EnvFactory.getEnv().getConnection()); + Statement statement = makeItCloseQuietly(connection.createStatement())) { + + Map> regionMap = getDataRegionMap(statement); + Map sqlRegionCounter = new TreeMap<>(); + regionMap.forEach( + (regionId, dataNodeIds) -> { + for (Integer dataNodeId : dataNodeIds) { + sqlRegionCounter.merge(dataNodeId, 1, Integer::sum); + } + }); + + LOGGER.info("Region distribution queried via SQL: {}", sqlRegionCounter); + + // Verify each region has correct replication factor + regionMap.forEach( + (regionId, dataNodeIds) -> { + Assert.assertEquals( + "Each Region should have correct replication factor", + TEST_REPLICATION_FACTOR, + dataNodeIds.size()); + }); + + // Verify distribution is balanced + int maxSql = sqlRegionCounter.values().stream().max(Integer::compareTo).orElse(0); + int minSql = sqlRegionCounter.values().stream().min(Integer::compareTo).orElse(0); + Assert.assertTrue( + "Via SQL query, after load balance, the difference between max and min Region count should <= 1", + maxSql - minSql <= 1); + } + } + + private void createDatabasesAndPartitions(SyncConfigNodeIServiceClient client) throws Exception { + for (int i = 0; i < TEST_DATABASE_NUM; i++) { + String curSg = DATABASE + i; + + // Set Database + TSStatus status = client.setDatabase(new TDatabaseSchema(curSg)); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + + // Create DataPartitions to create DataRegionGroups + Map> partitionSlotsMap = + ConfigNodeTestUtils.constructPartitionSlotsMap( + curSg, 0, 10, 0, 10, TEST_TIME_PARTITION_INTERVAL); + TDataPartitionTableResp dataPartitionTableResp = + client.getOrCreateDataPartitionTable(new TDataPartitionReq(partitionSlotsMap)); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + dataPartitionTableResp.getStatus().getCode()); + + LOGGER.info("Partitions created successfully for database {}", curSg); + } + } + + private Map getRegionDistribution(SyncConfigNodeIServiceClient client) + throws Exception { + TShowRegionResp resp = client.showRegion(new TShowRegionReq()); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), resp.getStatus().getCode()); + + Map dataNodeRegionCounter = new TreeMap<>(); + resp.getRegionInfoList() + .forEach( + regionInfo -> { + // Filter out system and audit databases + if (!regionInfo.getDatabase().equals(SystemConstant.SYSTEM_DATABASE) + && !regionInfo.getDatabase().equals(SystemConstant.AUDIT_DATABASE) + && TConsensusGroupType.DataRegion.equals( + regionInfo.getConsensusGroupId().getType())) { + dataNodeRegionCounter.merge(regionInfo.getDataNodeId(), 1, Integer::sum); + } + }); + + return dataNodeRegionCounter; + } + + /** + * Wait for region migration to complete. All regions should have the correct replication factor. + */ + private void awaitMigrationComplete() { + AtomicReference lastException = new AtomicReference<>(); + AtomicReference>> lastRegionMap = new AtomicReference<>(); + + try { + Awaitility.await() + .atMost(5, TimeUnit.MINUTES) + .pollDelay(2, TimeUnit.SECONDS) + .pollInterval(2, TimeUnit.SECONDS) + .until( + () -> { + try (Connection connection = + makeItCloseQuietly(EnvFactory.getEnv().getConnection()); + Statement statement = makeItCloseQuietly(connection.createStatement())) { + Map> regionMap = getDataRegionMap(statement); + lastRegionMap.set(regionMap); + + // Check if all regions have correct replication factor + for (Map.Entry> entry : regionMap.entrySet()) { + int regionId = entry.getKey(); + Set dataNodeIds = entry.getValue(); + if (dataNodeIds.size() != TEST_REPLICATION_FACTOR) { + LOGGER.info( + "Region {} has {} replicas, expected {}, migration not finished yet", + regionId, + dataNodeIds.size(), + TEST_REPLICATION_FACTOR); + return false; + } + } + return true; + } catch (Exception e) { + lastException.set(e); + LOGGER.warn("Exception while checking migration status: {}", e.getMessage()); + return false; + } + }); + LOGGER.info("Region migration completed successfully"); + } catch (ConditionTimeoutException e) { + if (lastRegionMap.get() == null) { + LOGGER.error( + "Migration check failed, lastRegionMap is null, last Exception:", lastException.get()); + throw e; + } + StringBuilder errorMsg = new StringBuilder(); + errorMsg.append("Region migration timeout in 5 minutes. Regions with incorrect replication:"); + for (Map.Entry> entry : lastRegionMap.get().entrySet()) { + int regionId = entry.getKey(); + Set dataNodeIds = entry.getValue(); + if (dataNodeIds.size() != TEST_REPLICATION_FACTOR) { + errorMsg.append( + String.format( + " Region %d has %d replicas (expected %d);", + regionId, dataNodeIds.size(), TEST_REPLICATION_FACTOR)); + } + } + LOGGER.error(errorMsg.toString()); + if (lastException.get() != null) { + LOGGER.error("Last exception during awaiting:", lastException.get()); + } + throw e; + } + } +} diff --git a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 index 8bb06a1dceb4..e7719e63e6c2 100644 --- a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 +++ b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 @@ -61,7 +61,7 @@ ddlStatement // CQ | createContinuousQuery | dropContinuousQuery | showContinuousQueries // Cluster - | showVariables | showCluster | showRegions | showDataNodes | showAvailableUrls | showConfigNodes | showClusterId + | showVariables | showCluster | showMigrations | showRegions | showDataNodes | showAvailableUrls | showConfigNodes | showClusterId | getRegionId | getTimeSlotList | countTimeSlotList | getSeriesSlotList | migrateRegion | reconstructRegion | extendRegion | removeRegion | removeDataNode | removeConfigNode | removeAINode | verifyConnection | balanceRegions @@ -486,6 +486,11 @@ showRegions (ON NODEID INTEGER_LITERAL (COMMA INTEGER_LITERAL)*)? ; +// ---- Show Migrations +showMigrations + : SHOW MIGRATIONS + ; + // ---- Show Data Nodes showDataNodes : SHOW DATANODES diff --git a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 index 8d84d02c4544..4a7e748ea8ca 100644 --- a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 +++ b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 @@ -522,6 +522,10 @@ MIGRATE : M I G R A T E ; +MIGRATIONS + : M I G R A T I O N S + ; + AINODE : A I N O D E ; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index 91c111344f62..cca6f3abb25c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -222,6 +222,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp; import org.apache.iotdb.confignode.rpc.thrift.TGetUdfTableReq; import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq; +import org.apache.iotdb.confignode.rpc.thrift.TMigrationInfo; import org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo; import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp; import org.apache.iotdb.confignode.rpc.thrift.TPipeConfigTransferReq; @@ -241,6 +242,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodes4InformationSchemaResp; import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp; import org.apache.iotdb.confignode.rpc.thrift.TShowDatabaseResp; +import org.apache.iotdb.confignode.rpc.thrift.TShowMigrationsReq; +import org.apache.iotdb.confignode.rpc.thrift.TShowMigrationsResp; import org.apache.iotdb.confignode.rpc.thrift.TShowPipePluginReq; import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq; import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp; @@ -1958,6 +1961,19 @@ public RegionInfoListResp showRegion(GetRegionInfoListPlan getRegionInfoListPlan } } + public TShowMigrationsResp showMigrations(TShowMigrationsReq showMigrationsReq) { + TSStatus status = confirmLeader(); + TShowMigrationsResp showMigrationsResp = new TShowMigrationsResp(); + if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + List migrationInfoList = procedureManager.getRunningMigrations(); + showMigrationsResp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); + showMigrationsResp.setMigrationInfoList(migrationInfoList); + } else { + showMigrationsResp.setStatus(status); + } + return showMigrationsResp; + } + @Override public TShowAINodesResp showAINodes() { TSStatus status = confirmLeader(); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java index 2e4227af3fc8..3c5e182de211 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java @@ -40,6 +40,7 @@ import org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression; import org.apache.iotdb.commons.service.metric.MetricService; import org.apache.iotdb.commons.trigger.TriggerInformation; +import org.apache.iotdb.commons.utils.CommonDateTimeUtils; import org.apache.iotdb.commons.utils.StatusUtils; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.confignode.conf.ConfigNodeConfig; @@ -124,6 +125,7 @@ import org.apache.iotdb.confignode.procedure.impl.trigger.DropTriggerProcedure; import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler; import org.apache.iotdb.confignode.procedure.scheduler.SimpleProcedureScheduler; +import org.apache.iotdb.confignode.procedure.state.RegionTransitionState; import org.apache.iotdb.confignode.procedure.store.ConfigProcedureStore; import org.apache.iotdb.confignode.procedure.store.IProcedureStore; import org.apache.iotdb.confignode.procedure.store.ProcedureFactory; @@ -144,6 +146,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq; import org.apache.iotdb.confignode.rpc.thrift.TExtendRegionReq; import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq; +import org.apache.iotdb.confignode.rpc.thrift.TMigrationInfo; import org.apache.iotdb.confignode.rpc.thrift.TReconstructRegionReq; import org.apache.iotdb.confignode.rpc.thrift.TRemoveRegionReq; import org.apache.iotdb.confignode.rpc.thrift.TSubscribeReq; @@ -2312,4 +2315,35 @@ public void removeMetrics() { public ProcedureMetrics getProcedureMetrics() { return procedureMetrics; } + + /** + * Get all running region migration procedures + * + * @return List of migration info + */ + public List getRunningMigrations() { + return getExecutor().getProcedures().values().stream() + .filter(procedure -> procedure instanceof RegionMigrateProcedure) + .filter(procedure -> !procedure.isFinished()) + .map( + procedure -> { + RegionMigrateProcedure migrateProc = (RegionMigrateProcedure) procedure; + TMigrationInfo info = new TMigrationInfo(); + info.setProcedureId(migrateProc.getProcId()); + info.setRegionId(migrateProc.getRegionId().getId()); + info.setRegionType(migrateProc.getRegionId().getType()); + info.setFromNodeId(migrateProc.getOriginalDataNode().getDataNodeId()); + info.setToNodeId(migrateProc.getDestDataNode().getDataNodeId()); + RegionTransitionState currentState = migrateProc.getCurrentRegionTransitionState(); + info.setCurrentState(currentState != null ? currentState.name() : "UNKNOWN"); + info.setProcedureStatus(migrateProc.getState().name()); + info.setSubmittedTime(migrateProc.getSubmittedTime()); + info.setLastUpdateTime(migrateProc.getLastUpdate()); + // Calculate duration + long duration = System.currentTimeMillis() - migrateProc.getSubmittedTime(); + info.setDuration(CommonDateTimeUtils.convertMillisecondToDurationStr(duration)); + return info; + }) + .collect(Collectors.toList()); + } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/migrator/GreedyCopySetRegionGroupMigrator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/migrator/GreedyCopySetRegionGroupMigrator.java index 40515d16680a..4bb3d5e5b38a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/migrator/GreedyCopySetRegionGroupMigrator.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/migrator/GreedyCopySetRegionGroupMigrator.java @@ -168,16 +168,49 @@ private void prepare( } List dataNodeIdList = new ArrayList<>(availableDataNodeMap.keySet()); - dataNodeIdList.sort(Comparator.comparingLong(node -> diskCounter[node])); + // Print disk usage for all nodes before sorting + LOGGER.info("[LoadBalance] Disk usage for all nodes before sorting:"); + for (Integer nodeId : dataNodeIdList) { + LOGGER.info( + "[LoadBalance] Node {}: diskUsage={}, regionCount={}", + nodeId, + diskCounter[nodeId], + regionCounter[nodeId]); + } + // Sort by disk usage first, then by region count if disk usage is equal + dataNodeIdList.sort( + Comparator.comparingLong((Integer node) -> diskCounter[node]) + .thenComparingInt(node -> regionCounter[node])); + // Print disk usage for all nodes after sorting + LOGGER.info("[LoadBalance] Node order after sorting by disk usage (ascending):"); + for (int i = 0; i < dataNodeIdList.size(); i++) { + Integer nodeId = dataNodeIdList.get(i); + LOGGER.info( + "[LoadBalance] Rank {}: Node {} (diskUsage={}, regionCount={})", + i, + nodeId, + diskCounter[nodeId], + regionCounter[nodeId]); + } availableToDataNodeSet = new HashSet<>(); for (int i = 0; i < 1; i++) { availableToDataNodeSet.add(dataNodeIdList.get(i)); System.out.println("Available To Node: " + dataNodeIdList.get(i)); + LOGGER.info( + "[LoadBalance] Selected as target node: Node {} (diskUsage={}, regionCount={})", + dataNodeIdList.get(i), + diskCounter[dataNodeIdList.get(i)], + regionCounter[dataNodeIdList.get(i)]); } availableFromDataNodeSet = new HashSet<>(); for (int i = 1; i < dataNodeIdList.size(); i++) { availableFromDataNodeSet.add(dataNodeIdList.get(i)); System.out.println("Available From Node: " + dataNodeIdList.get(i)); + LOGGER.info( + "[LoadBalance] Selected as source node: Node {} (diskUsage={}, regionCount={})", + dataNodeIdList.get(i), + diskCounter[dataNodeIdList.get(i)], + regionCounter[dataNodeIdList.get(i)]); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RegionMigrateProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RegionMigrateProcedure.java index 3d70f04f76d4..892130d9cb77 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RegionMigrateProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RegionMigrateProcedure.java @@ -209,4 +209,17 @@ public int hashCode() { public TDataNodeLocation getDestDataNode() { return destDataNode; } + + public TDataNodeLocation getOriginalDataNode() { + return originalDataNode; + } + + /** + * Get the current RegionTransitionState of this migration procedure + * + * @return the current RegionTransitionState, or null if not available + */ + public RegionTransitionState getCurrentRegionTransitionState() { + return getCurrentState(); + } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java index c4bc2e348852..f7276dd6028d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java @@ -203,6 +203,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodes4InformationSchemaResp; import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp; import org.apache.iotdb.confignode.rpc.thrift.TShowDatabaseResp; +import org.apache.iotdb.confignode.rpc.thrift.TShowMigrationsReq; +import org.apache.iotdb.confignode.rpc.thrift.TShowMigrationsResp; import org.apache.iotdb.confignode.rpc.thrift.TShowPipePluginReq; import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq; import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp; @@ -1064,6 +1066,11 @@ public TShowRegionResp showRegion(final TShowRegionReq showRegionReq) { return showRegionResp; } + @Override + public TShowMigrationsResp showMigrations(final TShowMigrationsReq showMigrationsReq) { + return configManager.showMigrations(showMigrationsReq); + } + @Override public TRegionRouteMapResp getLatestRegionRouteMap() { return configManager.getLatestRegionRouteMap(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java index 32be7e4a4b55..7b7611a6215f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java @@ -161,6 +161,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodes4InformationSchemaResp; import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp; import org.apache.iotdb.confignode.rpc.thrift.TShowDatabaseResp; +import org.apache.iotdb.confignode.rpc.thrift.TShowMigrationsReq; +import org.apache.iotdb.confignode.rpc.thrift.TShowMigrationsResp; import org.apache.iotdb.confignode.rpc.thrift.TShowPipePluginReq; import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq; import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp; @@ -884,6 +886,12 @@ public TShowRegionResp showRegion(TShowRegionReq req) throws TException { () -> client.showRegion(req), resp -> !updateConfigNodeLeader(resp.status)); } + @Override + public TShowMigrationsResp showMigrations(TShowMigrationsReq req) throws TException { + return executeRemoteCallWithRetry( + () -> client.showMigrations(req), resp -> !updateConfigNodeLeader(resp.status)); + } + @Override public TShowDataNodesResp showDataNodes() throws TException { return executeRemoteCallWithRetry( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/DatasetHeaderFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/DatasetHeaderFactory.java index d7fd3e071c64..c57b0a037ab2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/DatasetHeaderFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/DatasetHeaderFactory.java @@ -121,6 +121,10 @@ public static DatasetHeader getShowRegionHeader() { return new DatasetHeader(ColumnHeaderConstant.showRegionColumnHeaders, true); } + public static DatasetHeader getShowMigrationsHeader() { + return new DatasetHeader(ColumnHeaderConstant.showMigrationsColumnHeaders, true); + } + public static DatasetHeader getShowAINodesHeader() { return new DatasetHeader(ColumnHeaderConstant.showAINodesColumnHeaders, true); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java index 180f0329028a..d38e8a90f272 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java @@ -62,6 +62,7 @@ import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowDataNodesTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowDatabaseTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowFunctionsTask; +import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowMigrationsTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowPipePluginsTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowRegionTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowTTLTask; @@ -154,6 +155,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowDataNodesStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowDatabaseStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowFunctionsStatement; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowMigrationsStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowRegionStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowTTLStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowTriggersStatement; @@ -492,6 +494,12 @@ public IConfigTask visitShowRegion( return new ShowRegionTask(showRegionStatement, false); } + @Override + public IConfigTask visitShowMigrations( + ShowMigrationsStatement showMigrationsStatement, MPPQueryContext context) { + return new ShowMigrationsTask(showMigrationsStatement, false); + } + @Override public IConfigTask visitCreateSchemaTemplate( CreateSchemaTemplateStatement createSchemaTemplateStatement, MPPQueryContext context) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index 0f65fc2b4b0b..eb8e8486cb93 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -146,6 +146,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowConfigNodesResp; import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp; import org.apache.iotdb.confignode.rpc.thrift.TShowDatabaseResp; +import org.apache.iotdb.confignode.rpc.thrift.TShowMigrationsReq; +import org.apache.iotdb.confignode.rpc.thrift.TShowMigrationsResp; import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo; import org.apache.iotdb.confignode.rpc.thrift.TShowPipePluginReq; import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq; @@ -204,6 +206,7 @@ import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowContinuousQueriesTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowDataNodesTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowFunctionsTask; +import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowMigrationsTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowPipePluginsTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowRegionTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowTTLTask; @@ -264,6 +267,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.metadata.SetTTLStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowClusterStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowDatabaseStatement; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowMigrationsStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowRegionStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowTTLStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.AlterPipeStatement; @@ -1706,6 +1710,31 @@ public SettableFuture showRegion( return future; } + @Override + public SettableFuture showMigrations( + final ShowMigrationsStatement showMigrationsStatement, final boolean isTableModel) { + final SettableFuture future = SettableFuture.create(); + TShowMigrationsResp showMigrationsResp = new TShowMigrationsResp(); + final TShowMigrationsReq showMigrationsReq = + new TShowMigrationsReq().setIsTableModel(isTableModel); + try (final ConfigNodeClient client = + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + showMigrationsResp = client.showMigrations(showMigrationsReq); + if (showMigrationsResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + future.setException( + new IoTDBException( + showMigrationsResp.getStatus().message, showMigrationsResp.getStatus().code)); + return future; + } + } catch (final ClientManagerException | TException e) { + future.setException(e); + } + + // build TSBlock + ShowMigrationsTask.buildTSBlock(showMigrationsResp, future, isTableModel); + return future; + } + @Override public SettableFuture showDataNodes() { final SettableFuture future = SettableFuture.create(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java index 2406e3bae4b6..28bde6629e4e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java @@ -62,6 +62,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.metadata.SetTTLStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowClusterStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowDatabaseStatement; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowMigrationsStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowRegionStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowTTLStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.AlterPipeStatement; @@ -181,6 +182,9 @@ SettableFuture showAppliedConfigurations( SettableFuture showRegion( final ShowRegionStatement showRegionStatement, final boolean isTableModel); + SettableFuture showMigrations( + final ShowMigrationsStatement showMigrationsStatement, final boolean isTableModel); + SettableFuture showDataNodes(); SettableFuture showAvailableUrls(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowMigrationsTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowMigrationsTask.java new file mode 100644 index 000000000000..b0066a5ca66b --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowMigrationsTask.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.execution.config.metadata; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; +import org.apache.iotdb.commons.schema.column.ColumnHeader; +import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant; +import org.apache.iotdb.confignode.rpc.thrift.TMigrationInfo; +import org.apache.iotdb.confignode.rpc.thrift.TShowMigrationsResp; +import org.apache.iotdb.db.queryengine.common.header.DatasetHeader; +import org.apache.iotdb.db.queryengine.common.header.DatasetHeaderFactory; +import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult; +import org.apache.iotdb.db.queryengine.plan.execution.config.IConfigTask; +import org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowMigrationsStatement; +import org.apache.iotdb.db.utils.DateTimeUtils; +import org.apache.iotdb.rpc.TSStatusCode; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.apache.tsfile.utils.BytesUtils; + +import java.util.List; +import java.util.stream.Collectors; + +public class ShowMigrationsTask implements IConfigTask { + + private final ShowMigrationsStatement showMigrationsStatement; + private final boolean isTableModel; + + public ShowMigrationsTask( + final ShowMigrationsStatement showMigrationsStatement, final boolean isTableModel) { + this.showMigrationsStatement = showMigrationsStatement; + this.isTableModel = isTableModel; + } + + @Override + public ListenableFuture execute(final IConfigTaskExecutor configTaskExecutor) + throws InterruptedException { + return configTaskExecutor.showMigrations(showMigrationsStatement, isTableModel); + } + + public static void buildTSBlock( + final TShowMigrationsResp showMigrationsResp, + final SettableFuture future, + final boolean isTableModel) { + final List outputDataTypes = + ColumnHeaderConstant.showMigrationsColumnHeaders.stream() + .map(ColumnHeader::getColumnType) + .collect(Collectors.toList()); + final TsBlockBuilder builder = new TsBlockBuilder(outputDataTypes); + if (showMigrationsResp.getMigrationInfoList() != null) { + for (final TMigrationInfo migrationInfo : showMigrationsResp.getMigrationInfoList()) { + builder.getTimeColumnBuilder().writeLong(0L); + builder.getColumnBuilder(0).writeLong(migrationInfo.getProcedureId()); + builder.getColumnBuilder(1).writeInt(migrationInfo.getRegionId()); + if (migrationInfo.getRegionType().ordinal() == TConsensusGroupType.SchemaRegion.ordinal()) { + builder + .getColumnBuilder(2) + .writeBinary(BytesUtils.valueOf(String.valueOf(TConsensusGroupType.SchemaRegion))); + } else if (migrationInfo.getRegionType().ordinal() + == TConsensusGroupType.DataRegion.ordinal()) { + builder + .getColumnBuilder(2) + .writeBinary(BytesUtils.valueOf(String.valueOf(TConsensusGroupType.DataRegion))); + } + builder.getColumnBuilder(3).writeInt(migrationInfo.getFromNodeId()); + builder.getColumnBuilder(4).writeInt(migrationInfo.getToNodeId()); + builder + .getColumnBuilder(5) + .writeBinary(BytesUtils.valueOf(migrationInfo.getCurrentState())); + builder + .getColumnBuilder(6) + .writeBinary(BytesUtils.valueOf(migrationInfo.getProcedureStatus())); + builder + .getColumnBuilder(7) + .writeBinary( + BytesUtils.valueOf( + DateTimeUtils.convertLongToDate(migrationInfo.getSubmittedTime()))); + builder + .getColumnBuilder(8) + .writeBinary( + BytesUtils.valueOf( + DateTimeUtils.convertLongToDate(migrationInfo.getLastUpdateTime()))); + builder.getColumnBuilder(9).writeBinary(BytesUtils.valueOf(migrationInfo.getDuration())); + builder.declarePosition(); + } + } + final DatasetHeader datasetHeader = DatasetHeaderFactory.getShowMigrationsHeader(); + future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS, builder.build(), datasetHeader)); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java index 51756f350b34..22e5c5bc459e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java @@ -183,6 +183,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowDatabaseStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowDevicesStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowFunctionsStatement; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowMigrationsStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowRegionStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowTTLStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowTimeSeriesStatement; @@ -3776,6 +3777,13 @@ public Statement visitShowRegions(IoTDBSqlParser.ShowRegionsContext ctx) { return showRegionStatement; } + // show migrations + + @Override + public Statement visitShowMigrations(IoTDBSqlParser.ShowMigrationsContext ctx) { + return new ShowMigrationsStatement(); + } + // show datanodes @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/security/TreeAccessCheckVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/security/TreeAccessCheckVisitor.java index e4ad85196ad0..b127158fb1ed 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/security/TreeAccessCheckVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/security/TreeAccessCheckVisitor.java @@ -85,6 +85,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowDatabaseStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowDevicesStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowFunctionsStatement; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowMigrationsStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowRegionStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowTTLStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowTimeSeriesStatement; @@ -1714,6 +1715,12 @@ public TSStatus visitShowRegion(ShowRegionStatement statement, TreeAccessCheckCo return checkGlobalAuth(context, PrivilegeType.MAINTAIN, () -> ""); } + @Override + public TSStatus visitShowMigrations( + ShowMigrationsStatement statement, TreeAccessCheckContext context) { + return checkGlobalAuth(context, PrivilegeType.MAINTAIN, () -> ""); + } + @Override public TSStatus visitSetSpaceQuota( SetSpaceQuotaStatement setSpaceQuotaStatement, TreeAccessCheckContext context) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java index 5bf7d2424c3c..1e8503648247 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java @@ -74,6 +74,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowDatabaseStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowDevicesStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowFunctionsStatement; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowMigrationsStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowRegionStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowTTLStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowTimeSeriesStatement; @@ -504,6 +505,10 @@ public R visitShowRegion(ShowRegionStatement showRegionStatement, C context) { return visitStatement(showRegionStatement, context); } + public R visitShowMigrations(ShowMigrationsStatement showMigrationsStatement, C context) { + return visitStatement(showMigrationsStatement, context); + } + public R visitShowDataNodes(ShowDataNodesStatement showDataNodesStatement, C context) { return visitStatement(showDataNodesStatement, context); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/ShowMigrationsStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/ShowMigrationsStatement.java new file mode 100644 index 000000000000..57fbec56075d --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/ShowMigrationsStatement.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.statement.metadata; + +import org.apache.iotdb.db.queryengine.plan.analyze.QueryType; +import org.apache.iotdb.db.queryengine.plan.statement.IConfigStatement; +import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor; + +public class ShowMigrationsStatement extends ShowStatement implements IConfigStatement { + + public ShowMigrationsStatement() {} + + @Override + public QueryType getQueryType() { + return QueryType.READ; + } + + @Override + public R accept(final StatementVisitor visitor, final C context) { + return visitor.visitShowMigrations(this, context); + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java index 4f78ad9d5ea7..3c54bddeb640 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java @@ -169,6 +169,16 @@ private ColumnHeaderConstant() { public static final String TSFILE_SIZE = "TsFileSize"; public static final String COMPRESSION_RATIO = "CompressionRatio"; + // column names for show migrations statement + public static final String PROCEDURE_ID = "ProcedureId"; + public static final String FROM_NODE_ID = "FromNodeId"; + public static final String TO_NODE_ID = "ToNodeId"; + public static final String CURRENT_STATE = "CurrentState"; + public static final String PROCEDURE_STATUS = "ProcedureStatus"; + public static final String SUBMITTED_TIME = "SubmittedTime"; + public static final String LAST_UPDATE_TIME = "LastUpdateTime"; + public static final String DURATION = "Duration"; + // column names for show datanodes public static final String SCHEMA_REGION_NUM = "SchemaRegionNum"; public static final String DATA_REGION_NUM = "DataRegionNum"; @@ -445,6 +455,19 @@ private ColumnHeaderConstant() { new ColumnHeader(TSFILE_SIZE, TSDataType.TEXT), new ColumnHeader(COMPRESSION_RATIO, TSDataType.DOUBLE)); + public static final List showMigrationsColumnHeaders = + ImmutableList.of( + new ColumnHeader(PROCEDURE_ID, TSDataType.INT64), + new ColumnHeader(REGION_ID, TSDataType.INT32), + new ColumnHeader(TYPE, TSDataType.TEXT), + new ColumnHeader(FROM_NODE_ID, TSDataType.INT32), + new ColumnHeader(TO_NODE_ID, TSDataType.INT32), + new ColumnHeader(CURRENT_STATE, TSDataType.TEXT), + new ColumnHeader(PROCEDURE_STATUS, TSDataType.TEXT), + new ColumnHeader(SUBMITTED_TIME, TSDataType.TEXT), + new ColumnHeader(LAST_UPDATE_TIME, TSDataType.TEXT), + new ColumnHeader(DURATION, TSDataType.TEXT)); + public static final List showAINodesColumnHeaders = ImmutableList.of( new ColumnHeader(NODE_ID, TSDataType.INT32), diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift index 5a60955376f9..3b0b9894a0e7 100644 --- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift +++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift @@ -764,6 +764,29 @@ struct TShowRegionResp { 2: optional list regionInfoList; } +// Show migrations +struct TShowMigrationsReq { + 1: optional bool isTableModel +} + +struct TMigrationInfo { + 1: required i64 procedureId + 2: required i32 regionId + 3: required common.TConsensusGroupType regionType + 4: required i32 fromNodeId + 5: required i32 toNodeId + 6: required string currentState + 7: required string procedureStatus + 8: required i64 submittedTime + 9: required i64 lastUpdateTime + 10: required string duration +} + +struct TShowMigrationsResp { + 1: required common.TSStatus status + 2: optional list migrationInfoList; +} + // Routing struct TRegionRouteMapResp { 1: required common.TSStatus status @@ -1818,6 +1841,11 @@ service IConfigNodeRPCService { */ TShowRegionResp showRegion(TShowRegionReq req) + /** + * Show all running region migration tasks + */ + TShowMigrationsResp showMigrations(TShowMigrationsReq req) + // ====================================================== // Routing // ======================================================