WARNING: THIS SITE IS A MIRROR OF GITHUB.COM / IT CANNOT LOGIN OR REGISTER ACCOUNTS / THE CONTENTS ARE PROVIDED AS-IS / THIS SITE ASSUMES NO RESPONSIBILITY FOR ANY DISPLAYED CONTENT OR LINKS / IF YOU FOUND SOMETHING MAY NOT GOOD FOR EVERYONE, CONTACT ADMIN AT ilovescratch@foxmail.com
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 26 additions & 14 deletions src/iceberg/delete_file_index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ Result<bool> CanContainEqDeletesForFile(const DataFile& data_file,

Status PositionDeletes::Add(ManifestEntry&& entry) {
ICEBERG_PRECHECK(entry.sequence_number.has_value(),
"Missing sequence number for position delete: {}",
"Missing sequence number from position delete: {}",
entry.data_file->file_path);
files_.emplace_back(std::move(entry));
indexed_ = false;
Expand Down Expand Up @@ -213,7 +213,7 @@ void PositionDeletes::IndexIfNeeded() {

Status EqualityDeletes::Add(ManifestEntry&& entry) {
ICEBERG_PRECHECK(entry.sequence_number.has_value(),
"Missing sequence number for equality delete: {}",
"Missing sequence number from equality delete: {}",
entry.data_file->file_path);
files_.emplace_back(&schema_, std::move(entry));
indexed_ = false;
Expand Down Expand Up @@ -343,7 +343,7 @@ Result<std::vector<std::shared_ptr<DataFile>>> DeleteFileIndex::ForEntry(
const ManifestEntry& entry) const {
ICEBERG_PRECHECK(entry.data_file != nullptr, "Manifest entry has null data file");
ICEBERG_PRECHECK(entry.sequence_number.has_value(),
"Missing sequence number for data file: {}",
"Missing sequence number from data file: {}",
entry.data_file->file_path);
return ForDataFile(entry.sequence_number.value(), *entry.data_file);
}
Expand Down Expand Up @@ -396,8 +396,11 @@ Result<std::vector<std::shared_ptr<DataFile>>> DeleteFileIndex::FindEqPartitionD
return {};
}

auto deletes =
eq_deletes_by_partition_->get(data_file.partition_spec_id, data_file.partition);
ICEBERG_PRECHECK(data_file.partition_spec_id.has_value(),
"Missing partition spec id from data file {}", data_file.file_path);

auto deletes = eq_deletes_by_partition_->get(data_file.partition_spec_id.value(),
data_file.partition);
if (!deletes.has_value()) {
return {};
}
Expand All @@ -410,8 +413,11 @@ Result<std::vector<std::shared_ptr<DataFile>>> DeleteFileIndex::FindPosPartition
return {};
}

auto deletes =
pos_deletes_by_partition_->get(data_file.partition_spec_id, data_file.partition);
ICEBERG_PRECHECK(data_file.partition_spec_id.has_value(),
"Missing partition spec id from data file {}", data_file.file_path);

auto deletes = pos_deletes_by_partition_->get(data_file.partition_spec_id.value(),
data_file.partition);
if (!deletes.has_value()) {
return {};
}
Expand Down Expand Up @@ -606,7 +612,7 @@ Result<std::vector<ManifestEntry>> DeleteFileIndex::Builder::LoadDeleteFiles() {
for (auto& entry : entries) {
ICEBERG_CHECK(entry.data_file != nullptr, "ManifestEntry must have a data file");
ICEBERG_CHECK(entry.sequence_number.has_value(),
"Missing sequence number for delete file: {}",
"Missing sequence number from delete file: {}",
entry.data_file->file_path);
if (entry.sequence_number.value() > min_sequence_number_) {
auto& file = *entry.data_file;
Expand All @@ -628,8 +634,8 @@ Result<std::vector<ManifestEntry>> DeleteFileIndex::Builder::LoadDeleteFiles() {
Status DeleteFileIndex::Builder::AddDV(
std::unordered_map<std::string, ManifestEntry>& dv_by_path, ManifestEntry&& entry) {
ICEBERG_PRECHECK(entry.data_file != nullptr, "ManifestEntry must have a data file");
ICEBERG_PRECHECK(entry.sequence_number.has_value(), "Missing sequence number for DV {}",
entry.data_file->file_path);
ICEBERG_PRECHECK(entry.sequence_number.has_value(),
"Missing sequence number from DV {}", entry.data_file->file_path);

const auto& path = entry.data_file->referenced_data_file;
ICEBERG_PRECHECK(path.has_value(), "DV must have a referenced data file");
Expand All @@ -649,7 +655,7 @@ Status DeleteFileIndex::Builder::AddPositionDelete(
ManifestEntry&& entry) {
ICEBERG_PRECHECK(entry.data_file != nullptr, "ManifestEntry must have a data file");
ICEBERG_PRECHECK(entry.sequence_number.has_value(),
"Missing sequence number for position delete {}",
"Missing sequence number from position delete {}",
entry.data_file->file_path);

ICEBERG_ASSIGN_OR_RAISE(auto referenced_path,
Expand All @@ -664,7 +670,10 @@ Status DeleteFileIndex::Builder::AddPositionDelete(
ICEBERG_RETURN_UNEXPECTED(deletes->Add(std::move(entry)));
} else {
// Partition-scoped position delete
int32_t spec_id = entry.data_file->partition_spec_id;
ICEBERG_PRECHECK(entry.data_file->partition_spec_id.has_value(),
"Missing partition spec id from position delete {}",
entry.data_file->file_path);
int32_t spec_id = entry.data_file->partition_spec_id.value();
const auto& partition = entry.data_file->partition;

auto existing = deletes_by_partition.get(spec_id, partition);
Expand All @@ -686,10 +695,13 @@ Status DeleteFileIndex::Builder::AddEqualityDelete(
ManifestEntry&& entry) {
ICEBERG_PRECHECK(entry.data_file != nullptr, "ManifestEntry must have a data file");
ICEBERG_PRECHECK(entry.sequence_number.has_value(),
"Missing sequence number for equality delete {}",
"Missing sequence number from equality delete {}",
entry.data_file->file_path);
ICEBERG_PRECHECK(entry.data_file->partition_spec_id.has_value(),
"Missing partition spec id from equality delete {}",
entry.data_file->file_path);

int32_t spec_id = entry.data_file->partition_spec_id;
int32_t spec_id = entry.data_file->partition_spec_id.value();

auto spec_it = specs_by_id_.find(spec_id);
if (spec_it == specs_by_id_.end()) {
Expand Down
8 changes: 5 additions & 3 deletions src/iceberg/manifest/manifest_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,6 @@ struct ICEBERG_EXPORT DataFile {
/// order, and should set sort order id to null. Readers must ignore sort order id for
/// position delete files.
std::optional<int32_t> sort_order_id;
/// This field is not included in spec, so it is not serialized into the manifest file.
/// It is just store in memory representation used in process.
int32_t partition_spec_id = PartitionSpec::kInitialSpecId;
/// Field id: 142
/// The _row_id for the first row in the data file.
///
Expand Down Expand Up @@ -178,6 +175,11 @@ struct ICEBERG_EXPORT DataFile {
/// present
std::optional<int64_t> content_size_in_bytes;

/// \brief Partition spec id for this data file.
/// \note This field is for internal use only and will not be persisted to manifest
/// entry.
std::optional<int32_t> partition_spec_id;

static constexpr int32_t kContentFieldId = 134;
inline static const SchemaField kContent = SchemaField::MakeOptional(
kContentFieldId, "content", int32(),
Expand Down
9 changes: 6 additions & 3 deletions src/iceberg/manifest/manifest_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -814,11 +814,13 @@ Result<InclusiveMetricsEvaluator*> ManifestReaderImpl::GetMetricsEvaluator() {
return metrics_evaluator_.get();
}

bool ManifestReaderImpl::InPartitionSet(const DataFile& file) const {
Result<bool> ManifestReaderImpl::InPartitionSet(const DataFile& file) const {
if (!partition_set_) {
return true;
}
return partition_set_->contains(file.partition_spec_id, file.partition);
ICEBERG_PRECHECK(file.partition_spec_id.has_value(),
"Missing partition spec id from data file {}", file.file_path);
return partition_set_->contains(file.partition_spec_id.value(), file.partition);
}

Status ManifestReaderImpl::OpenReader(std::shared_ptr<Schema> projection) {
Expand Down Expand Up @@ -943,7 +945,8 @@ Result<std::vector<ManifestEntry>> ManifestReaderImpl::ReadEntries(bool only_liv
continue;
}
}
if (!InPartitionSet(*entry.data_file)) {
ICEBERG_ASSIGN_OR_RAISE(bool in_partition_set, InPartitionSet(*entry.data_file));
if (!in_partition_set) {
continue;
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/iceberg/manifest/manifest_reader_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class ManifestReaderImpl : public ManifestReader {
Result<InclusiveMetricsEvaluator*> GetMetricsEvaluator();

/// \brief Check if a partition is in the partition set.
bool InPartitionSet(const DataFile& file) const;
Result<bool> InPartitionSet(const DataFile& file) const;

// Fields set at construction
const std::string manifest_path_;
Expand Down
4 changes: 2 additions & 2 deletions src/iceberg/test/delete_file_index_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ class DeleteFileIndexTest : public testing::TestWithParam<int> {
.partition = partition,
.record_count = 1,
.file_size_in_bytes = 10,
.partition_spec_id = spec_id,
.referenced_data_file = referenced_file,
.partition_spec_id = spec_id,
});
}

Expand Down Expand Up @@ -141,10 +141,10 @@ class DeleteFileIndexTest : public testing::TestWithParam<int> {
.partition = partition,
.record_count = 1,
.file_size_in_bytes = 10,
.partition_spec_id = spec_id,
.referenced_data_file = referenced_file,
.content_offset = content_offset,
.content_size_in_bytes = content_size,
.partition_spec_id = spec_id,
});
}

Expand Down
2 changes: 1 addition & 1 deletion src/iceberg/test/manifest_group_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ class ManifestGroupTest : public testing::TestWithParam<int> {
.partition = partition,
.record_count = 1,
.file_size_in_bytes = 10,
.partition_spec_id = spec_id,
.referenced_data_file = referenced_file,
.partition_spec_id = spec_id,
});
}

Expand Down
Loading