diff --git a/BOSSVeloxEngine/Source/BOSSVeloxEngine.cpp b/BOSSVeloxEngine/Source/BOSSVeloxEngine.cpp index 695e27d..26050ec 100644 --- a/BOSSVeloxEngine/Source/BOSSVeloxEngine.cpp +++ b/BOSSVeloxEngine/Source/BOSSVeloxEngine.cpp @@ -553,7 +553,8 @@ getColumns(ComplexExpression&& expression, memory::MemoryPool* pool) { [pool, &indices, &columnName](boss::Span&& typedSpan) -> VectorPtr { if constexpr(std::is_same_v || std::is_same_v || std::is_same_v || std::is_same_v || - std::is_same_v || std::is_same_v) { + std::is_same_v || + std::is_same_v) { return spanToVelox(std::move(typedSpan), pool, indices); } else { throw std::runtime_error( @@ -809,7 +810,7 @@ static std::vector expressionToProjections(ComplexExpression&& e) { } PlanBuilder Engine::buildOperatorPipeline( - ComplexExpression&& e, std::vector>& scanIds, + ComplexExpression&& e, std::vector>& scanIds, memory::MemoryPool& pool, std::shared_ptr& planNodeIdGenerator, int& tableCnt, int& joinCnt) { if(e.getHead().getName() == "Table" || e.getHead().getName() == "Gather" || @@ -829,6 +830,7 @@ PlanBuilder Engine::buildOperatorPipeline( core::PlanNodeId scanId; auto numSpans = spanRowCountVec.size(); + auto numRows = std::accumulate(spanRowCountVec.begin(), spanRowCountVec.end(), 0); auto plan = PlanBuilder(planNodeIdGenerator) .startTableScan() .outputType(tableSchema) @@ -838,7 +840,7 @@ PlanBuilder Engine::buildOperatorPipeline( .assignments(assignmentsMap) .endTableScan() .capturePlanNodeId(scanId); - scanIds.emplace_back(scanId, numSpans); + scanIds.emplace_back(scanId, numSpans, numRows); return std::move(plan); } if(e.getHead().getName() == "Project") { @@ -889,7 +891,12 @@ PlanBuilder Engine::buildOperatorPipeline( it == itEnd ? std::vector{} : expressionToOneSideKeys(std::move(secondArg)); auto asExpr = get(it == itEnd ? std::move(secondArg) : std::move(*it)); auto aggregates = expressionToProjections(std::move(asExpr)); - return inputPlan.singleAggregation(groupKeysStr, aggregates); + if(maxThreads < 2) { + return inputPlan.singleAggregation(groupKeysStr, aggregates); + } + return inputPlan.partialAggregation(groupKeysStr, aggregates) + .localPartition(groupKeysStr) + .finalAggregation(); } if(e.getHead() == "Order"_ || e.getHead() == "OrderBy"_ || e.getHead() == "Sort"_ || e.getHead() == "SortBy"_) { @@ -898,7 +905,8 @@ PlanBuilder Engine::buildOperatorPipeline( auto inputPlan = buildOperatorPipeline(get(std::move(*it++)), scanIds, pool, planNodeIdGenerator, tableCnt, joinCnt); auto groupKeysStr = expressionToOneSideKeys(std::move(*it)); - return inputPlan.orderBy(groupKeysStr, true).localMerge(groupKeysStr); + return inputPlan.localPartition(std::vector{}) + .orderBy(groupKeysStr, false);//.localMerge(groupKeysStr); } if(e.getHead() == "Top"_ || e.getHead() == "TopN"_) { auto [head, unused_, dynamics, unused2_] = std::move(e).decompose(); @@ -908,7 +916,10 @@ PlanBuilder Engine::buildOperatorPipeline( auto groupKeysStr = expressionToOneSideKeys(std::move(*it++)); auto limit = std::holds_alternative(*it) ? std::get(std::move(*it)) : std::get(std::move(*it)); - return inputPlan.topN(groupKeysStr, limit, true).localMerge(groupKeysStr); + return inputPlan.localPartition(std::vector{}) + .orderBy(groupKeysStr, false)//.localMerge(groupKeysStr) + .limit(0, limit, false); + //.topN(groupKeysStr, limit, true).localMerge(groupKeysStr); } if(e.getHead() == "Let"_) { auto [head, unused_, dynamics, unused2_] = std::move(e).decompose(); @@ -963,19 +974,38 @@ boss::Expression Engine::evaluate(boss::ComplexExpression&& e) { if(e.getHead().getName() == "Set") { auto param = std::get(e.getDynamicArguments()[0]); - if(param == "maxThreads"_) { + if(param == "MaxThreads"_) { maxThreads = std::holds_alternative(e.getDynamicArguments()[1]) ? std::get(e.getDynamicArguments()[1]) : std::get(e.getDynamicArguments()[1]); return true; } - if(param == "internalBatchNumRows"_) { + if(param == "NumDrivers"_) { + numDrivers = std::holds_alternative(e.getDynamicArguments()[1]) + ? std::get(e.getDynamicArguments()[1]) + : std::get(e.getDynamicArguments()[1]); + return true; + } + if(param == "InputBatchNumSplits"_) { + inputBatchNumSplits = std::holds_alternative(e.getDynamicArguments()[1]) + ? std::get(e.getDynamicArguments()[1]) + : std::get(e.getDynamicArguments()[1]); + return true; + } + if(param == "InputBatchNumRows"_) { + // overrides inputBatchNumSplits if > 0 + inputBatchNumRows = std::holds_alternative(e.getDynamicArguments()[1]) + ? std::get(e.getDynamicArguments()[1]) + : std::get(e.getDynamicArguments()[1]); + return true; + } + if(param == "InternalBatchNumRows"_) { internalBatchNumRows = std::holds_alternative(e.getDynamicArguments()[1]) ? std::get(e.getDynamicArguments()[1]) : std::get(e.getDynamicArguments()[1]); return true; } - if(param == "minimumOutputBatchNumRows"_) { + if(param == "MinimumOutputBatchNumRows"_) { minimumOutputBatchNumRows = std::holds_alternative(e.getDynamicArguments()[1]) ? std::get(e.getDynamicArguments()[1]) : std::get(e.getDynamicArguments()[1]); @@ -1000,7 +1030,7 @@ boss::Expression Engine::evaluate(boss::ComplexExpression&& e) { boss::expressions::ExpressionArguments columns; auto evalAndAddOutputSpans = [&, this](auto&& e) { - auto scanIds = std::vector>{}; + auto scanIds = std::vector>{}; auto planNodeIdGenerator = std::make_shared(); int tableCnt = 0; int joinCnt = 0; @@ -1009,7 +1039,11 @@ boss::Expression Engine::evaluate(boss::ComplexExpression&& e) { auto params = std::make_unique(); params->planNode = plan.planNode(); - params->maxDrivers = std::max(1, (maxThreads / (joinCnt + 1)) - 1); + if(numDrivers > 0) { + params->maxDrivers = numDrivers; + } else { + params->maxDrivers = std::max(1, (maxThreads / (joinCnt + 1)) - 1); + } params->copyResult = false; std::shared_ptr executor; if(maxThreads < 2) { @@ -1028,7 +1062,8 @@ boss::Expression Engine::evaluate(boss::ComplexExpression&& e) { std::make_shared(executor.get(), core::QueryConfig{std::move(config)}); std::unique_ptr cursor; - auto results = veloxRunQueryParallel(*params, cursor, scanIds); + auto results = + veloxRunQueryParallel(*params, cursor, scanIds, inputBatchNumRows, inputBatchNumSplits); if(!cursor) { throw std::runtime_error("Query terminated with error"); } diff --git a/BOSSVeloxEngine/Source/BOSSVeloxEngine.hpp b/BOSSVeloxEngine/Source/BOSSVeloxEngine.hpp index 00889c3..28c6f57 100644 --- a/BOSSVeloxEngine/Source/BOSSVeloxEngine.hpp +++ b/BOSSVeloxEngine/Source/BOSSVeloxEngine.hpp @@ -19,7 +19,7 @@ __declspec(dllexport) void reset(); // #define USE_NEW_TABLE_FORMAT -// #define TAKE_OWNERSHIP_OF_TASK_POOLS // requires velox patch to Task.h +#define TAKE_OWNERSHIP_OF_TASK_POOLS // requires velox patch to Task.h // #define DebugInfo @@ -48,12 +48,15 @@ class Engine { threadPools_; int32_t maxThreads = 1; + int32_t numDrivers = 0; // overrides maxThreads if > 0 + int32_t inputBatchNumSplits = 64; + int32_t inputBatchNumRows = 0; // overrides numSplits if > 0 int32_t internalBatchNumRows = 0; int32_t minimumOutputBatchNumRows = 0; bool hashAdaptivityEnabled = true; PlanBuilder buildOperatorPipeline(ComplexExpression&& e, - std::vector>& scanIds, + std::vector>& scanIds, memory::MemoryPool& pool, std::shared_ptr& planNodeIdGenerator, int& tableCnt, int& joinCnt); diff --git a/BOSSVeloxEngine/Source/BossConnector.cpp b/BOSSVeloxEngine/Source/BossConnector.cpp index d2a76b7..3bd80f7 100644 --- a/BOSSVeloxEngine/Source/BossConnector.cpp +++ b/BOSSVeloxEngine/Source/BossConnector.cpp @@ -52,12 +52,50 @@ void BossDataSource::addSplit(std::shared_ptr split) { currentSplit_ = std::dynamic_pointer_cast(split); VELOX_CHECK_NOT_NULL(currentSplit_, "Wrong type of split for BossDataSource."); - spanCountIdx_ = currentSplit_->partNumber; - splitOffset_ = 0; - splitEnd_ = bossSpanRowCountVec_.at(spanCountIdx_) - splitOffset_; + int totalRows = std::accumulate(bossSpanRowCountVec_.begin(), bossSpanRowCountVec_.end(), 0); + float rowsPerPart = (float)totalRows / currentSplit_->totalParts; + + int currentSplitRowStart = rowsPerPart * currentSplit_->partNumber; + int currentSplitRowEnd = rowsPerPart * (currentSplit_->partNumber + 1); + + if(currentSplitRowStart >= totalRows) { + spanCountIdx_ = currentSplit_->totalParts - 1; + splitOffset_ = splitEnd_ = 0; + return; + } + + if(currentSplit_->totalParts <= bossSpanRowCountVec_.size()) { + spanCountIdx_ = currentSplit_->partNumber; + splitOffset_ = 0; + splitEnd_ = bossSpanRowCountVec_.at(spanCountIdx_); + } else { + spanCountIdx_ = 0; + for(auto const& currRowCount : bossSpanRowCountVec_) { + if(currentSplitRowStart < currRowCount) { + // Found the span that contains the start of the current split. + if(currentSplitRowStart < rowsPerPart) { + splitOffset_ = 0; // a little bigger to match the beginning of the span + } else { + splitOffset_ = currentSplitRowStart; + } + if(currentSplitRowEnd > currRowCount) { + splitEnd_ = currRowCount; // a little smaller to match the end of the span + } else { + splitEnd_ = currentSplitRowEnd; + } + break; + } + spanCountIdx_++; + currentSplitRowStart -= currRowCount; + currentSplitRowEnd -= currRowCount; + } + } #ifdef DebugInfo std::cout << "addSplit for table " << bossTableName_ << std::endl; + std::cout << " totalRows: " << totalRows << std::endl; + std::cout << " totalParts: " << currentSplit_->totalParts << std::endl; + std::cout << " partNumber: " << currentSplit_->partNumber << std::endl; std::cout << " bossSpanRowCountVec_.size(): " << bossSpanRowCountVec_.size() << std::endl; std::cout << " spanCountIdx_: " << spanCountIdx_ << std::endl; std::cout << " splitOffset_: " << splitOffset_ << std::endl; @@ -66,10 +104,6 @@ void BossDataSource::addSplit(std::shared_ptr split) { } RowVectorPtr BossDataSource::getBossData(uint64_t length) { -#ifdef DebugInfo - std::cout << "getBossData: spanCountIdx_=" << spanCountIdx_ << " splitOffset_=" << splitOffset_ - << " length=" << length << std::endl; -#endif assert(splitOffset_ <= INT_MAX); assert(length <= INT_MAX); @@ -87,9 +121,14 @@ RowVectorPtr BossDataSource::getBossData(uint64_t length) { std::optional BossDataSource::next(uint64_t size, ContinueFuture& /*future*/) { VELOX_CHECK_NOT_NULL(currentSplit_, "No split to process. Call addSplit() first."); - auto maxRows = std::min(size, (splitEnd_ - splitOffset_)); + // auto maxRows = std::min(size, (splitEnd_ - splitOffset_)); + auto maxRows = splitEnd_ - splitOffset_; auto outputVector = getBossData(maxRows); +#ifdef DebugInfo + std::cout << "requested size: " << size << ", maxRows: " << maxRows << std::endl; +#endif // DebugInfo + // If the split is exhausted. if(!outputVector || outputVector->size() == 0) { currentSplit_ = nullptr; diff --git a/BOSSVeloxEngine/Source/BridgeVelox.cpp b/BOSSVeloxEngine/Source/BridgeVelox.cpp index 571c561..61ea6cd 100644 --- a/BOSSVeloxEngine/Source/BridgeVelox.cpp +++ b/BOSSVeloxEngine/Source/BridgeVelox.cpp @@ -147,15 +147,22 @@ std::vector myReadCursor(CursorParameters const& params, std::vector veloxRunQueryParallel(CursorParameters const& params, std::unique_ptr& cursor, - std::vector> const& scanIds) { + std::vector> const& scanIds, + size_t batchSize, size_t numSplits) { try { bool noMoreSplits = false; auto addSplits = [&](exec::Task* task) { if(!noMoreSplits) { - for(auto const& [scanId, numSpans] : scanIds) { - for(size_t i = 0; i < numSpans; ++i) { + for(auto const& [scanId, numSpans, totalNumRows] : scanIds) { + if(batchSize > 0) { + numSplits = 2 + totalNumRows / batchSize; + } + if(numSplits < numSpans) { + numSplits = numSpans; + } + for(size_t i = 0; i < numSplits; ++i) { task->addSplit(scanId, exec::Split(std::make_shared( - kBossConnectorId, numSpans, i))); + kBossConnectorId, numSplits, i))); } task->noMoreSplits(scanId); } diff --git a/BOSSVeloxEngine/Source/BridgeVelox.h b/BOSSVeloxEngine/Source/BridgeVelox.h index 0c711c7..2126963 100644 --- a/BOSSVeloxEngine/Source/BridgeVelox.h +++ b/BOSSVeloxEngine/Source/BridgeVelox.h @@ -48,7 +48,8 @@ BufferPtr importFromBossAsOwnerBuffer(BossArray&& bossArray); std::vector veloxRunQueryParallel(CursorParameters const& params, std::unique_ptr& cursor, - std::vector> const& scanIds); + std::vector> const& scanIds, + size_t numRows, size_t numSplits); void veloxPrintResults(std::vector const& results); diff --git a/Benchmarks/BOSSBenchmarks.cpp b/Benchmarks/BOSSBenchmarks.cpp index a007dcd..dac7bdb 100644 --- a/Benchmarks/BOSSBenchmarks.cpp +++ b/Benchmarks/BOSSBenchmarks.cpp @@ -57,6 +57,11 @@ static int BENCHMARK_NUM_WARMPUP_ITERATIONS = 3; static bool MONETDB_MULTITHREADING = true; static int DUCKDB_MAX_THREADS = 100; +static int BOSS_MAX_THREADS = 20; +static int VELOX_NUM_DRIVERS = 0; // > 0 : overrides BOSS_MAX_THREADS +static int VELOX_NUM_SPLITS = 512; +static int VELOX_BATCH_SIZE = 100000; // > 0: overrides VELOX_NUM_SPLITS + static bool USE_FIXED_POINT_NUMERIC_TYPE = false; static bool DISABLE_MMAP_CACHE = false; @@ -153,6 +158,10 @@ static void initBOSSEngine_TPCH(int dataSize, int64_t storageBlockSize) { checkForErrors(eval("Set"_("ArrayFireEngineCopyDataIn"_, BENCHMARK_DATA_COPY_IN))); checkForErrors(eval("Set"_("ArrayFireEngineCopyDataOut"_, BENCHMARK_DATA_COPY_OUT))); checkForErrors(eval("Set"_("DisableGatherOperator"_, DISABLE_GATHER_OPERATOR))); + checkForErrors(eval("Set"_("MaxThreads"_, BOSS_MAX_THREADS))); + checkForErrors(eval("Set"_("NumDrivers"_, VELOX_NUM_DRIVERS))); + checkForErrors(eval("Set"_("InputBatchNumSplits"_, VELOX_NUM_SPLITS))); + checkForErrors(eval("Set"_("InputBatchNumRows"_, VELOX_BATCH_SIZE))); checkForErrors( eval("CreateTable"_("LINEITEM"_, "l_orderkey"_, "l_partkey"_, "l_suppkey"_, "l_linenumber"_, @@ -819,7 +828,6 @@ static auto& monetdbQueries() { " nation,"s " o_year desc;"s}, {TPCH_Q18, "select"s - " c_name,"s " c_custkey,"s " o_orderkey,"s " o_orderdate,"s @@ -943,7 +951,6 @@ static auto& duckdbQueries() { " nation," " o_year DESC;"}, {TPCH_Q18, "SELECT" - " c_name," " c_custkey," " o_orderkey," " o_orderdate," @@ -1254,6 +1261,22 @@ void initAndRunBenchmarks(int argc, char** argv) { if(++i < argc) { DUCKDB_MAX_THREADS = atoi(argv[i]); } + } else if(std::string("--max-threads") == argv[i]) { + if(++i < argc) { + BOSS_MAX_THREADS = atoi(argv[i]); + } + } else if(std::string("--num-drivers") == argv[i]) { + if(++i < argc) { + VELOX_NUM_DRIVERS = atoi(argv[i]); + } + } else if(std::string("--velox-num-splits") == argv[i]) { + if(++i < argc) { + VELOX_NUM_SPLITS = atoi(argv[i]); + } + } else if(std::string("--velox-batch-size") == argv[i]) { + if(++i < argc) { + VELOX_BATCH_SIZE = atoi(argv[i]); + } } else if(std::string("--monetdb-enable-multithreading") == argv[i]) { MONETDB_MULTITHREADING = true; } else if(std::string("--fixed-point-numeric-type") == argv[i]) { @@ -1269,8 +1292,7 @@ void initAndRunBenchmarks(int argc, char** argv) { } } // register TPC-H benchmarks - for(int dataSize : - std::vector{1, 10, 100, 1000, 2000, 5000, 10000, 20000, 50000, 100000}) { + for(int dataSize : std::vector{1, 10, 100, 1000, 2000, 5000, 10000, 20000, 50000, 100000}) { for(int engine = ENGINE_START; engine < ENGINE_END; ++engine) { for(int64_t blockSize : (BENCHMARK_STORAGE_BLOCK_SIZE && engine == BOSS