WARNING: THIS SITE IS A MIRROR OF GITHUB.COM / IT CANNOT LOGIN OR REGISTER ACCOUNTS / THE CONTENTS ARE PROVIDED AS-IS / THIS SITE ASSUMES NO RESPONSIBILITY FOR ANY DISPLAYED CONTENT OR LINKS / IF YOU FOUND SOMETHING MAY NOT GOOD FOR EVERYONE, CONTACT ADMIN AT ilovescratch@foxmail.com
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 95 additions & 19 deletions be/src/io/cache/block_file_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "io/cache/block_file_cache.h"

#include <cstdio>
#include <exception>
#include <fstream>
#include <unordered_set>

Expand Down Expand Up @@ -56,6 +57,85 @@
namespace doris::io {
#include "common/compile_check_begin.h"

// Insert a block pointer into one shard while swallowing allocation failures.
bool NeedUpdateLRUBlocks::insert(FileBlockSPtr block) {
if (!block) {
return false;
}
try {
auto* raw_ptr = block.get();
auto idx = shard_index(raw_ptr);
auto& shard = _shards[idx];
std::lock_guard lock(shard.mutex);
auto [_, inserted] = shard.entries.emplace(raw_ptr, std::move(block));
if (inserted) {
_size.fetch_add(1, std::memory_order_relaxed);
}
return inserted;
} catch (const std::exception& e) {
LOG(WARNING) << "Failed to enqueue block for LRU update: " << e.what();
} catch (...) {
LOG(WARNING) << "Failed to enqueue block for LRU update: unknown error";
}
return false;
}

// Drain up to `limit` unique blocks to the caller, keeping the structure consistent on failures.
size_t NeedUpdateLRUBlocks::drain(size_t limit, std::vector<FileBlockSPtr>* output) {
if (limit == 0 || output == nullptr) {
return 0;
}
size_t drained = 0;
try {
output->reserve(output->size() + std::min(limit, size()));
for (auto& shard : _shards) {
if (drained >= limit) {
break;
}
std::lock_guard lock(shard.mutex);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just "drain" the entire shard instead of draining one by one.
we dont actually care about the order here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because we want release the _mutex for a while if the for loop takes too long.

auto it = shard.entries.begin();
size_t shard_drained = 0;
while (it != shard.entries.end() && drained + shard_drained < limit) {
output->emplace_back(std::move(it->second));
it = shard.entries.erase(it);
++shard_drained;
}
if (shard_drained > 0) {
_size.fetch_sub(shard_drained, std::memory_order_relaxed);
drained += shard_drained;
}
}
} catch (const std::exception& e) {
LOG(WARNING) << "Failed to drain LRU update blocks: " << e.what();
} catch (...) {
LOG(WARNING) << "Failed to drain LRU update blocks: unknown error";
}
return drained;
}

// Remove every pending block, guarding against unexpected exceptions.
void NeedUpdateLRUBlocks::clear() {
try {
for (auto& shard : _shards) {
std::lock_guard lock(shard.mutex);
if (!shard.entries.empty()) {
auto removed = shard.entries.size();
shard.entries.clear();
_size.fetch_sub(removed, std::memory_order_relaxed);
}
}
} catch (const std::exception& e) {
LOG(WARNING) << "Failed to clear LRU update blocks: " << e.what();
} catch (...) {
LOG(WARNING) << "Failed to clear LRU update blocks: unknown error";
}
}

size_t NeedUpdateLRUBlocks::shard_index(FileBlock* ptr) const {
DCHECK(ptr != nullptr);
return std::hash<FileBlock*> {}(ptr)&kShardMask;
}

BlockFileCache::BlockFileCache(const std::string& cache_base_path,
const FileCacheSettings& cache_settings)
: _cache_base_path(cache_base_path),
Expand Down Expand Up @@ -627,11 +707,8 @@ FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& hash, const CacheConte
}

void BlockFileCache::add_need_update_lru_block(FileBlockSPtr block) {
bool ret = _need_update_lru_blocks.enqueue(block);
if (ret) [[likely]] {
*_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size_approx();
} else {
LOG_WARNING("Failed to push FileBlockSPtr to _need_update_lru_blocks");
if (_need_update_lru_blocks.insert(std::move(block))) {
*_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
}
}

Expand Down Expand Up @@ -2189,8 +2266,7 @@ void BlockFileCache::run_background_evict_in_advance() {

void BlockFileCache::run_background_block_lru_update() {
Thread::set_self_name("run_background_block_lru_update");
FileBlockSPtr block;
size_t batch_count = 0;
std::vector<FileBlockSPtr> batch;
while (!_close) {
int64_t interval_ms = config::file_cache_background_block_lru_update_interval_ms;
size_t batch_limit =
Expand All @@ -2203,18 +2279,24 @@ void BlockFileCache::run_background_block_lru_update() {
}
}

batch.clear();
batch.reserve(batch_limit);
size_t drained = _need_update_lru_blocks.drain(batch_limit, &batch);
if (drained == 0) {
*_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
continue;
}

int64_t duration_ns = 0;
{
SCOPED_CACHE_LOCK(_mutex, this);
SCOPED_RAW_TIMER(&duration_ns);
while (batch_count < batch_limit && _need_update_lru_blocks.try_dequeue(block)) {
for (auto& block : batch) {
update_block_lru(block, cache_lock);
batch_count++;
}
}
*_update_lru_blocks_latency_us << (duration_ns / 1000);
*_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size_approx();
batch_count = 0;
*_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
}
}

Expand Down Expand Up @@ -2356,14 +2438,8 @@ bool BlockFileCache::try_reserve_during_async_load(size_t size,
}

void BlockFileCache::clear_need_update_lru_blocks() {
constexpr size_t kBatchSize = 1024;
std::vector<FileBlockSPtr> buffer(kBatchSize);
size_t drained = 0;
while ((drained = _need_update_lru_blocks.try_dequeue_bulk(buffer.data(), buffer.size())) > 0) {
for (size_t i = 0; i < drained; ++i) {
buffer[i].reset();
}
}
_need_update_lru_blocks.clear();
*_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
}

std::string BlockFileCache::clear_file_cache_directly() {
Expand Down
47 changes: 46 additions & 1 deletion be/src/io/cache/block_file_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,16 @@
#include <concurrentqueue.h>

#include <algorithm>
#include <array>
#include <atomic>
#include <boost/lockfree/spsc_queue.hpp>
#include <functional>
#include <memory>
#include <mutex>
#include <optional>
#include <thread>
#include <unordered_map>
#include <vector>

#include "io/cache/cache_lru_dumper.h"
#include "io/cache/file_block.h"
Expand Down Expand Up @@ -73,6 +78,46 @@ class LockScopedTimer {

class FSFileCacheStorage;

// NeedUpdateLRUBlocks keeps FileBlockSPtr entries that require LRU updates in a
// deduplicated, sharded container. Entries are keyed by the raw FileBlock
// pointer so that multiple shared_ptr copies of the same block are treated as a
// single pending update. The structure is thread-safe and optimized for high
// contention insert/drain workloads in the background update thread.
// Note that Blocks are updated in batch, internal order is not important.
class NeedUpdateLRUBlocks {
public:
NeedUpdateLRUBlocks() = default;

// Insert a block into the pending set. Returns true only when the block
// was not already queued. Null inputs are ignored.
bool insert(FileBlockSPtr block);

// Drain up to `limit` unique blocks into `output`. The method returns how
// many blocks were actually drained and shrinks the internal size
// accordingly.
size_t drain(size_t limit, std::vector<FileBlockSPtr>* output);

// Remove every pending block from the structure and reset the size.
void clear();

// Thread-safe approximate size of queued unique blocks.
size_t size() const { return _size.load(std::memory_order_relaxed); }

private:
static constexpr size_t kShardCount = 64;
static constexpr size_t kShardMask = kShardCount - 1;

struct Shard {
std::mutex mutex;
std::unordered_map<FileBlock*, FileBlockSPtr> entries;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need sharedptr to keep reference of file block?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes we do for safety reasons

};

size_t shard_index(FileBlock* ptr) const;

std::array<Shard, kShardCount> _shards;
std::atomic<size_t> _size {0};
};

// The BlockFileCache is responsible for the management of the blocks
// The current strategies are lru and ttl.

Expand Down Expand Up @@ -581,7 +626,7 @@ class BlockFileCache {
std::unique_ptr<FileCacheStorage> _storage;
std::shared_ptr<bvar::LatencyRecorder> _lru_dump_latency_us;
std::mutex _dump_lru_queues_mtx;
moodycamel::ConcurrentQueue<FileBlockSPtr> _need_update_lru_blocks;
NeedUpdateLRUBlocks _need_update_lru_blocks;
};

} // namespace doris::io
23 changes: 11 additions & 12 deletions be/test/io/cache/block_file_cache_test_lru_dump.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -534,13 +534,19 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader_direct_read_order_check) {

// read
ASSERT_TRUE(reader->read_at(0, Slice(buffer.data(), buffer.size()), &bytes_read, &io_ctx).ok());
// order are updated in batch, so wait the former batch complete
std::this_thread::sleep_for(std::chrono::milliseconds(
2 * config::file_cache_background_block_lru_update_interval_ms));
ASSERT_TRUE(
reader->read_at(1024 * 1024, Slice(buffer.data(), buffer.size()), &bytes_read, &io_ctx)
.ok());
std::this_thread::sleep_for(std::chrono::milliseconds(
2 * config::file_cache_background_block_lru_update_interval_ms));
ASSERT_TRUE(reader->read_at(1024 * 1024 * 2, Slice(buffer.data(), buffer.size()), &bytes_read,
&io_ctx)
.ok());

std::this_thread::sleep_for(std::chrono::milliseconds(
2 * config::file_cache_background_block_lru_update_interval_ms));
// check inital order
std::vector<size_t> initial_offsets;
for (auto it = cache->_normal_queue.begin(); it != cache->_normal_queue.end(); ++it) {
Expand All @@ -555,21 +561,14 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader_direct_read_order_check) {
ASSERT_TRUE(reader->read_at(1024 * 1024 * 2, Slice(buffer.data(), buffer.size()), &bytes_read,
&io_ctx)
.ok());
std::this_thread::sleep_for(std::chrono::milliseconds(
2 * config::file_cache_background_block_lru_update_interval_ms));
ASSERT_TRUE(
reader->read_at(1024 * 1024, Slice(buffer.data(), buffer.size()), &bytes_read, &io_ctx)
.ok());
std::this_thread::sleep_for(std::chrono::milliseconds(
2 * config::file_cache_background_block_lru_update_interval_ms));
ASSERT_TRUE(reader->read_at(0, Slice(buffer.data(), buffer.size()), &bytes_read, &io_ctx).ok());

std::vector<size_t> before_updated_offsets;
for (auto it = cache->_normal_queue.begin(); it != cache->_normal_queue.end(); ++it) {
before_updated_offsets.push_back(it->offset);
}
ASSERT_EQ(before_updated_offsets.size(), 3);
ASSERT_EQ(before_updated_offsets[0], 0);
ASSERT_EQ(before_updated_offsets[1], 1024 * 1024);
ASSERT_EQ(before_updated_offsets[2], 1024 * 1024 * 2);

// wait LRU update
std::this_thread::sleep_for(std::chrono::milliseconds(
2 * config::file_cache_background_block_lru_update_interval_ms));

Expand Down
112 changes: 112 additions & 0 deletions be/test/io/cache/need_update_lru_blocks_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <gtest/gtest.h>

#include <memory>
#include <vector>

#include "io/cache/block_file_cache.h"

namespace doris::io {
namespace {

FileBlockSPtr create_block(int idx) {
FileCacheKey key;
key.hash = UInt128Wrapper(vectorized::UInt128(static_cast<uint64_t>(idx + 1)));
key.offset = static_cast<size_t>(idx * 16);
key.meta.expiration_time = 0;
key.meta.type = FileCacheType::NORMAL;
key.meta.tablet_id = idx;
return std::make_shared<FileBlock>(key, /*size*/ 1, /*mgr*/ nullptr, FileBlock::State::EMPTY);
}

void insert_blocks(NeedUpdateLRUBlocks* pending, int count, int start_idx = 0) {
for (int i = 0; i < count; ++i) {
ASSERT_TRUE(pending->insert(create_block(start_idx + i)))
<< "Block " << (start_idx + i) << " should be inserted";
}
}

} // namespace

TEST(NeedUpdateLRUBlocksTest, InsertRejectsNullAndDeduplicates) {
NeedUpdateLRUBlocks pending;
FileBlockSPtr null_block;
EXPECT_FALSE(pending.insert(null_block));
EXPECT_EQ(0, pending.size());

auto block = create_block(0);
EXPECT_TRUE(pending.insert(block));
EXPECT_EQ(1, pending.size());

EXPECT_FALSE(pending.insert(block)) << "Same pointer should not enqueue twice";
EXPECT_EQ(1, pending.size());
}

TEST(NeedUpdateLRUBlocksTest, DrainHandlesZeroLimitAndNullOutput) {
NeedUpdateLRUBlocks pending;
insert_blocks(&pending, 3);
std::vector<FileBlockSPtr> drained;

EXPECT_EQ(0, pending.drain(0, &drained));
EXPECT_TRUE(drained.empty());
EXPECT_EQ(3, pending.size());

EXPECT_EQ(0, pending.drain(2, nullptr));
EXPECT_EQ(3, pending.size());
}

TEST(NeedUpdateLRUBlocksTest, DrainRespectsLimitAndLeavesRemainder) {
NeedUpdateLRUBlocks pending;
insert_blocks(&pending, 5);
std::vector<FileBlockSPtr> drained;

size_t drained_now = pending.drain(2, &drained);
EXPECT_EQ(2u, drained_now);
EXPECT_EQ(2u, drained.size());
EXPECT_EQ(3u, pending.size());

drained_now = pending.drain(10, &drained);
EXPECT_EQ(3u, drained_now);
EXPECT_EQ(5u, drained.size());
EXPECT_EQ(0u, pending.size());
}

TEST(NeedUpdateLRUBlocksTest, DrainFromEmptyReturnsZero) {
NeedUpdateLRUBlocks pending;
std::vector<FileBlockSPtr> drained;
EXPECT_EQ(0u, pending.drain(4, &drained));
EXPECT_TRUE(drained.empty());
}

TEST(NeedUpdateLRUBlocksTest, ClearIsIdempotent) {
NeedUpdateLRUBlocks pending;
pending.clear();
EXPECT_EQ(0u, pending.size());

insert_blocks(&pending, 4);
EXPECT_EQ(4u, pending.size());

pending.clear();
EXPECT_EQ(0u, pending.size());

std::vector<FileBlockSPtr> drained;
EXPECT_EQ(0u, pending.drain(4, &drained));
}

} // namespace doris::io
Loading