diff --git a/be/src/io/cache/block_file_cache.cpp b/be/src/io/cache/block_file_cache.cpp index 70a513de660938..8c1e5edd64e495 100644 --- a/be/src/io/cache/block_file_cache.cpp +++ b/be/src/io/cache/block_file_cache.cpp @@ -21,6 +21,7 @@ #include "io/cache/block_file_cache.h" #include +#include #include #include @@ -56,6 +57,85 @@ namespace doris::io { #include "common/compile_check_begin.h" +// Insert a block pointer into one shard while swallowing allocation failures. +bool NeedUpdateLRUBlocks::insert(FileBlockSPtr block) { + if (!block) { + return false; + } + try { + auto* raw_ptr = block.get(); + auto idx = shard_index(raw_ptr); + auto& shard = _shards[idx]; + std::lock_guard lock(shard.mutex); + auto [_, inserted] = shard.entries.emplace(raw_ptr, std::move(block)); + if (inserted) { + _size.fetch_add(1, std::memory_order_relaxed); + } + return inserted; + } catch (const std::exception& e) { + LOG(WARNING) << "Failed to enqueue block for LRU update: " << e.what(); + } catch (...) { + LOG(WARNING) << "Failed to enqueue block for LRU update: unknown error"; + } + return false; +} + +// Drain up to `limit` unique blocks to the caller, keeping the structure consistent on failures. +size_t NeedUpdateLRUBlocks::drain(size_t limit, std::vector* output) { + if (limit == 0 || output == nullptr) { + return 0; + } + size_t drained = 0; + try { + output->reserve(output->size() + std::min(limit, size())); + for (auto& shard : _shards) { + if (drained >= limit) { + break; + } + std::lock_guard lock(shard.mutex); + auto it = shard.entries.begin(); + size_t shard_drained = 0; + while (it != shard.entries.end() && drained + shard_drained < limit) { + output->emplace_back(std::move(it->second)); + it = shard.entries.erase(it); + ++shard_drained; + } + if (shard_drained > 0) { + _size.fetch_sub(shard_drained, std::memory_order_relaxed); + drained += shard_drained; + } + } + } catch (const std::exception& e) { + LOG(WARNING) << "Failed to drain LRU update blocks: " << e.what(); + } catch (...) { + LOG(WARNING) << "Failed to drain LRU update blocks: unknown error"; + } + return drained; +} + +// Remove every pending block, guarding against unexpected exceptions. +void NeedUpdateLRUBlocks::clear() { + try { + for (auto& shard : _shards) { + std::lock_guard lock(shard.mutex); + if (!shard.entries.empty()) { + auto removed = shard.entries.size(); + shard.entries.clear(); + _size.fetch_sub(removed, std::memory_order_relaxed); + } + } + } catch (const std::exception& e) { + LOG(WARNING) << "Failed to clear LRU update blocks: " << e.what(); + } catch (...) { + LOG(WARNING) << "Failed to clear LRU update blocks: unknown error"; + } +} + +size_t NeedUpdateLRUBlocks::shard_index(FileBlock* ptr) const { + DCHECK(ptr != nullptr); + return std::hash {}(ptr)&kShardMask; +} + BlockFileCache::BlockFileCache(const std::string& cache_base_path, const FileCacheSettings& cache_settings) : _cache_base_path(cache_base_path), @@ -627,11 +707,8 @@ FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& hash, const CacheConte } void BlockFileCache::add_need_update_lru_block(FileBlockSPtr block) { - bool ret = _need_update_lru_blocks.enqueue(block); - if (ret) [[likely]] { - *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size_approx(); - } else { - LOG_WARNING("Failed to push FileBlockSPtr to _need_update_lru_blocks"); + if (_need_update_lru_blocks.insert(std::move(block))) { + *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size(); } } @@ -2189,8 +2266,7 @@ void BlockFileCache::run_background_evict_in_advance() { void BlockFileCache::run_background_block_lru_update() { Thread::set_self_name("run_background_block_lru_update"); - FileBlockSPtr block; - size_t batch_count = 0; + std::vector batch; while (!_close) { int64_t interval_ms = config::file_cache_background_block_lru_update_interval_ms; size_t batch_limit = @@ -2203,18 +2279,24 @@ void BlockFileCache::run_background_block_lru_update() { } } + batch.clear(); + batch.reserve(batch_limit); + size_t drained = _need_update_lru_blocks.drain(batch_limit, &batch); + if (drained == 0) { + *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size(); + continue; + } + int64_t duration_ns = 0; { SCOPED_CACHE_LOCK(_mutex, this); SCOPED_RAW_TIMER(&duration_ns); - while (batch_count < batch_limit && _need_update_lru_blocks.try_dequeue(block)) { + for (auto& block : batch) { update_block_lru(block, cache_lock); - batch_count++; } } *_update_lru_blocks_latency_us << (duration_ns / 1000); - *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size_approx(); - batch_count = 0; + *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size(); } } @@ -2356,14 +2438,8 @@ bool BlockFileCache::try_reserve_during_async_load(size_t size, } void BlockFileCache::clear_need_update_lru_blocks() { - constexpr size_t kBatchSize = 1024; - std::vector buffer(kBatchSize); - size_t drained = 0; - while ((drained = _need_update_lru_blocks.try_dequeue_bulk(buffer.data(), buffer.size())) > 0) { - for (size_t i = 0; i < drained; ++i) { - buffer[i].reset(); - } - } + _need_update_lru_blocks.clear(); + *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size(); } std::string BlockFileCache::clear_file_cache_directly() { diff --git a/be/src/io/cache/block_file_cache.h b/be/src/io/cache/block_file_cache.h index 0d563ec8f025f2..0d8595632d2c0d 100644 --- a/be/src/io/cache/block_file_cache.h +++ b/be/src/io/cache/block_file_cache.h @@ -21,11 +21,16 @@ #include #include +#include +#include #include +#include #include #include #include #include +#include +#include #include "io/cache/cache_lru_dumper.h" #include "io/cache/file_block.h" @@ -73,6 +78,46 @@ class LockScopedTimer { class FSFileCacheStorage; +// NeedUpdateLRUBlocks keeps FileBlockSPtr entries that require LRU updates in a +// deduplicated, sharded container. Entries are keyed by the raw FileBlock +// pointer so that multiple shared_ptr copies of the same block are treated as a +// single pending update. The structure is thread-safe and optimized for high +// contention insert/drain workloads in the background update thread. +// Note that Blocks are updated in batch, internal order is not important. +class NeedUpdateLRUBlocks { +public: + NeedUpdateLRUBlocks() = default; + + // Insert a block into the pending set. Returns true only when the block + // was not already queued. Null inputs are ignored. + bool insert(FileBlockSPtr block); + + // Drain up to `limit` unique blocks into `output`. The method returns how + // many blocks were actually drained and shrinks the internal size + // accordingly. + size_t drain(size_t limit, std::vector* output); + + // Remove every pending block from the structure and reset the size. + void clear(); + + // Thread-safe approximate size of queued unique blocks. + size_t size() const { return _size.load(std::memory_order_relaxed); } + +private: + static constexpr size_t kShardCount = 64; + static constexpr size_t kShardMask = kShardCount - 1; + + struct Shard { + std::mutex mutex; + std::unordered_map entries; + }; + + size_t shard_index(FileBlock* ptr) const; + + std::array _shards; + std::atomic _size {0}; +}; + // The BlockFileCache is responsible for the management of the blocks // The current strategies are lru and ttl. @@ -581,7 +626,7 @@ class BlockFileCache { std::unique_ptr _storage; std::shared_ptr _lru_dump_latency_us; std::mutex _dump_lru_queues_mtx; - moodycamel::ConcurrentQueue _need_update_lru_blocks; + NeedUpdateLRUBlocks _need_update_lru_blocks; }; } // namespace doris::io diff --git a/be/test/io/cache/block_file_cache_test_lru_dump.cpp b/be/test/io/cache/block_file_cache_test_lru_dump.cpp index 99c2c780bed847..65762b216a09b6 100644 --- a/be/test/io/cache/block_file_cache_test_lru_dump.cpp +++ b/be/test/io/cache/block_file_cache_test_lru_dump.cpp @@ -534,13 +534,19 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader_direct_read_order_check) { // read ASSERT_TRUE(reader->read_at(0, Slice(buffer.data(), buffer.size()), &bytes_read, &io_ctx).ok()); + // order are updated in batch, so wait the former batch complete + std::this_thread::sleep_for(std::chrono::milliseconds( + 2 * config::file_cache_background_block_lru_update_interval_ms)); ASSERT_TRUE( reader->read_at(1024 * 1024, Slice(buffer.data(), buffer.size()), &bytes_read, &io_ctx) .ok()); + std::this_thread::sleep_for(std::chrono::milliseconds( + 2 * config::file_cache_background_block_lru_update_interval_ms)); ASSERT_TRUE(reader->read_at(1024 * 1024 * 2, Slice(buffer.data(), buffer.size()), &bytes_read, &io_ctx) .ok()); - + std::this_thread::sleep_for(std::chrono::milliseconds( + 2 * config::file_cache_background_block_lru_update_interval_ms)); // check inital order std::vector initial_offsets; for (auto it = cache->_normal_queue.begin(); it != cache->_normal_queue.end(); ++it) { @@ -555,21 +561,14 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader_direct_read_order_check) { ASSERT_TRUE(reader->read_at(1024 * 1024 * 2, Slice(buffer.data(), buffer.size()), &bytes_read, &io_ctx) .ok()); + std::this_thread::sleep_for(std::chrono::milliseconds( + 2 * config::file_cache_background_block_lru_update_interval_ms)); ASSERT_TRUE( reader->read_at(1024 * 1024, Slice(buffer.data(), buffer.size()), &bytes_read, &io_ctx) .ok()); + std::this_thread::sleep_for(std::chrono::milliseconds( + 2 * config::file_cache_background_block_lru_update_interval_ms)); ASSERT_TRUE(reader->read_at(0, Slice(buffer.data(), buffer.size()), &bytes_read, &io_ctx).ok()); - - std::vector before_updated_offsets; - for (auto it = cache->_normal_queue.begin(); it != cache->_normal_queue.end(); ++it) { - before_updated_offsets.push_back(it->offset); - } - ASSERT_EQ(before_updated_offsets.size(), 3); - ASSERT_EQ(before_updated_offsets[0], 0); - ASSERT_EQ(before_updated_offsets[1], 1024 * 1024); - ASSERT_EQ(before_updated_offsets[2], 1024 * 1024 * 2); - - // wait LRU update std::this_thread::sleep_for(std::chrono::milliseconds( 2 * config::file_cache_background_block_lru_update_interval_ms)); diff --git a/be/test/io/cache/need_update_lru_blocks_test.cpp b/be/test/io/cache/need_update_lru_blocks_test.cpp new file mode 100644 index 00000000000000..5b41a23bd0ebbd --- /dev/null +++ b/be/test/io/cache/need_update_lru_blocks_test.cpp @@ -0,0 +1,112 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include +#include + +#include "io/cache/block_file_cache.h" + +namespace doris::io { +namespace { + +FileBlockSPtr create_block(int idx) { + FileCacheKey key; + key.hash = UInt128Wrapper(vectorized::UInt128(static_cast(idx + 1))); + key.offset = static_cast(idx * 16); + key.meta.expiration_time = 0; + key.meta.type = FileCacheType::NORMAL; + key.meta.tablet_id = idx; + return std::make_shared(key, /*size*/ 1, /*mgr*/ nullptr, FileBlock::State::EMPTY); +} + +void insert_blocks(NeedUpdateLRUBlocks* pending, int count, int start_idx = 0) { + for (int i = 0; i < count; ++i) { + ASSERT_TRUE(pending->insert(create_block(start_idx + i))) + << "Block " << (start_idx + i) << " should be inserted"; + } +} + +} // namespace + +TEST(NeedUpdateLRUBlocksTest, InsertRejectsNullAndDeduplicates) { + NeedUpdateLRUBlocks pending; + FileBlockSPtr null_block; + EXPECT_FALSE(pending.insert(null_block)); + EXPECT_EQ(0, pending.size()); + + auto block = create_block(0); + EXPECT_TRUE(pending.insert(block)); + EXPECT_EQ(1, pending.size()); + + EXPECT_FALSE(pending.insert(block)) << "Same pointer should not enqueue twice"; + EXPECT_EQ(1, pending.size()); +} + +TEST(NeedUpdateLRUBlocksTest, DrainHandlesZeroLimitAndNullOutput) { + NeedUpdateLRUBlocks pending; + insert_blocks(&pending, 3); + std::vector drained; + + EXPECT_EQ(0, pending.drain(0, &drained)); + EXPECT_TRUE(drained.empty()); + EXPECT_EQ(3, pending.size()); + + EXPECT_EQ(0, pending.drain(2, nullptr)); + EXPECT_EQ(3, pending.size()); +} + +TEST(NeedUpdateLRUBlocksTest, DrainRespectsLimitAndLeavesRemainder) { + NeedUpdateLRUBlocks pending; + insert_blocks(&pending, 5); + std::vector drained; + + size_t drained_now = pending.drain(2, &drained); + EXPECT_EQ(2u, drained_now); + EXPECT_EQ(2u, drained.size()); + EXPECT_EQ(3u, pending.size()); + + drained_now = pending.drain(10, &drained); + EXPECT_EQ(3u, drained_now); + EXPECT_EQ(5u, drained.size()); + EXPECT_EQ(0u, pending.size()); +} + +TEST(NeedUpdateLRUBlocksTest, DrainFromEmptyReturnsZero) { + NeedUpdateLRUBlocks pending; + std::vector drained; + EXPECT_EQ(0u, pending.drain(4, &drained)); + EXPECT_TRUE(drained.empty()); +} + +TEST(NeedUpdateLRUBlocksTest, ClearIsIdempotent) { + NeedUpdateLRUBlocks pending; + pending.clear(); + EXPECT_EQ(0u, pending.size()); + + insert_blocks(&pending, 4); + EXPECT_EQ(4u, pending.size()); + + pending.clear(); + EXPECT_EQ(0u, pending.size()); + + std::vector drained; + EXPECT_EQ(0u, pending.drain(4, &drained)); +} + +} // namespace doris::io