WARNING: THIS SITE IS A MIRROR OF GITHUB.COM / IT CANNOT LOGIN OR REGISTER ACCOUNTS / THE CONTENTS ARE PROVIDED AS-IS / THIS SITE ASSUMES NO RESPONSIBILITY FOR ANY DISPLAYED CONTENT OR LINKS / IF YOU FOUND SOMETHING MAY NOT GOOD FOR EVERYONE, CONTACT ADMIN AT ilovescratch@foxmail.com
Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ set(ICEBERG_SOURCES
transform_function.cc
type.cc
update/pending_update.cc
update/snapshot_update.cc
update/update_partition_spec.cc
update/update_properties.cc
update/update_schema.cc
Expand Down
134 changes: 134 additions & 0 deletions src/iceberg/snapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

#include "iceberg/snapshot.h"

#include <charconv>

#include "iceberg/file_io.h"
#include "iceberg/manifest/manifest_list.h"
#include "iceberg/manifest/manifest_reader.h"
Expand Down Expand Up @@ -75,6 +77,39 @@ std::optional<std::string_view> Snapshot::operation() const {
return std::nullopt;
}

std::optional<int64_t> Snapshot::FirstRowId() const {
auto it = summary.find("first-row-id");
if (it == summary.end()) {
return std::nullopt;
}

int64_t first_row_id = 0;
const std::string& first_row_id_str = it->second;
auto [_, ec] =
std::from_chars(first_row_id_str.data(),
first_row_id_str.data() + first_row_id_str.size(), first_row_id);
if (ec != std::errc()) {
return std::nullopt;
}
return first_row_id;
}

std::optional<int64_t> Snapshot::AddedRows() const {
auto it = summary.find("added-rows");
if (it == summary.end()) {
return std::nullopt;
}

int64_t added_rows = 0;
const std::string& added_rows_str = it->second;
auto [_, ec] = std::from_chars(
added_rows_str.data(), added_rows_str.data() + added_rows_str.size(), added_rows);
if (ec != std::errc()) {
return std::nullopt;
}
return added_rows;
}

bool Snapshot::Equals(const Snapshot& other) const {
if (this == &other) {
return true;
Expand Down Expand Up @@ -141,4 +176,103 @@ Result<std::span<ManifestFile>> CachedSnapshot::DeleteManifests(
return std::span<ManifestFile>(cache.first.data() + delete_start, delete_count);
}

// SnapshotRef::Builder implementation

SnapshotRef::Builder::Builder(SnapshotRefType type, int64_t snapshot_id)
: type_(type), snapshot_id_(snapshot_id) {}

SnapshotRef::Builder SnapshotRef::Builder::TagBuilder(int64_t snapshot_id) {
return Builder(SnapshotRefType::kTag, snapshot_id);
}

SnapshotRef::Builder SnapshotRef::Builder::BranchBuilder(int64_t snapshot_id) {
return Builder(SnapshotRefType::kBranch, snapshot_id);
}

SnapshotRef::Builder SnapshotRef::Builder::BuilderFor(int64_t snapshot_id,
SnapshotRefType type) {
return Builder(type, snapshot_id);
}

SnapshotRef::Builder SnapshotRef::Builder::BuilderFrom(const SnapshotRef& ref) {
Builder builder(ref.type(), ref.snapshot_id);
if (ref.type() == SnapshotRefType::kBranch) {
const auto& branch = std::get<SnapshotRef::Branch>(ref.retention);
builder.min_snapshots_to_keep_ = branch.min_snapshots_to_keep;
builder.max_snapshot_age_ms_ = branch.max_snapshot_age_ms;
builder.max_ref_age_ms_ = branch.max_ref_age_ms;
} else {
const auto& tag = std::get<SnapshotRef::Tag>(ref.retention);
builder.max_ref_age_ms_ = tag.max_ref_age_ms;
}
return builder;
}

SnapshotRef::Builder SnapshotRef::Builder::BuilderFrom(const SnapshotRef& ref,
int64_t snapshot_id) {
Builder builder(ref.type(), snapshot_id);
if (ref.type() == SnapshotRefType::kBranch) {
const auto& branch = std::get<SnapshotRef::Branch>(ref.retention);
builder.min_snapshots_to_keep_ = branch.min_snapshots_to_keep;
builder.max_snapshot_age_ms_ = branch.max_snapshot_age_ms;
builder.max_ref_age_ms_ = branch.max_ref_age_ms;
} else {
const auto& tag = std::get<SnapshotRef::Tag>(ref.retention);
builder.max_ref_age_ms_ = tag.max_ref_age_ms;
}
return builder;
}

SnapshotRef::Builder& SnapshotRef::Builder::MinSnapshotsToKeep(
std::optional<int32_t> value) {
if (type_ == SnapshotRefType::kTag && value.has_value()) {
return AddError(ErrorKind::kInvalidArgument,
"Tags do not support setting minSnapshotsToKeep");
}
if (value.has_value() && value.value() <= 0) {
return AddError(ErrorKind::kInvalidArgument,
"Min snapshots to keep must be greater than 0");
}
min_snapshots_to_keep_ = value;
return *this;
}

SnapshotRef::Builder& SnapshotRef::Builder::MaxSnapshotAgeMs(
std::optional<int64_t> value) {
if (type_ == SnapshotRefType::kTag && value.has_value()) {
return AddError(ErrorKind::kInvalidArgument,
"Tags do not support setting maxSnapshotAgeMs");
}
if (value.has_value() && value.value() <= 0) {
return AddError(ErrorKind::kInvalidArgument,
"Max snapshot age must be greater than 0 ms");
}
max_snapshot_age_ms_ = value;
return *this;
}

SnapshotRef::Builder& SnapshotRef::Builder::MaxRefAgeMs(std::optional<int64_t> value) {
if (value.has_value() && value.value() <= 0) {
return AddError(ErrorKind::kInvalidArgument,
"Max reference age must be greater than 0");
}
max_ref_age_ms_ = value;
return *this;
}

Result<SnapshotRef> SnapshotRef::Builder::Build() const {
ICEBERG_RETURN_UNEXPECTED(CheckErrors());

if (type_ == SnapshotRefType::kBranch) {
return SnapshotRef{
.snapshot_id = snapshot_id_,
.retention = SnapshotRef::Branch{.min_snapshots_to_keep = min_snapshots_to_keep_,
.max_snapshot_age_ms = max_snapshot_age_ms_,
.max_ref_age_ms = max_ref_age_ms_}};
} else {
return SnapshotRef{.snapshot_id = snapshot_id_,
.retention = SnapshotRef::Tag{.max_ref_age_ms = max_ref_age_ms_}};
}
}

} // namespace iceberg
85 changes: 85 additions & 0 deletions src/iceberg/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "iceberg/manifest/manifest_list.h"
#include "iceberg/result.h"
#include "iceberg/type_fwd.h"
#include "iceberg/util/error_collector.h"
#include "iceberg/util/lazy.h"
#include "iceberg/util/timepoint.h"

Expand Down Expand Up @@ -119,6 +120,67 @@ struct ICEBERG_EXPORT SnapshotRef {
return lhs.Equals(rhs);
}

/// \brief Builder class for constructing SnapshotRef objects
class ICEBERG_EXPORT Builder : public ErrorCollector {
public:
/// \brief Create a builder for a tag reference
/// \param snapshot_id The snapshot ID for the tag
/// \return A new Builder instance for a tag
static Builder TagBuilder(int64_t snapshot_id);

/// \brief Create a builder for a branch reference
/// \param snapshot_id The snapshot ID for the branch
/// \return A new Builder instance for a branch
static Builder BranchBuilder(int64_t snapshot_id);

/// \brief Create a builder from an existing SnapshotRef
/// \param ref The existing reference to copy properties from
/// \return A new Builder instance with properties from the existing ref
static Builder BuilderFrom(const SnapshotRef& ref);

/// \brief Create a builder from an existing SnapshotRef with a new snapshot ID
/// \param ref The existing reference to copy properties from
/// \param snapshot_id The new snapshot ID to use
/// \return A new Builder instance with properties from the existing ref but new
/// snapshot ID
static Builder BuilderFrom(const SnapshotRef& ref, int64_t snapshot_id);

/// \brief Create a builder for a specific type
/// \param snapshot_id The snapshot ID
/// \param type The type of reference (branch or tag)
/// \return A new Builder instance
static Builder BuilderFor(int64_t snapshot_id, SnapshotRefType type);

/// \brief Set the minimum number of snapshots to keep (branch only)
/// \param value The minimum number of snapshots to keep, or nullopt for default
/// \return Reference to this builder for method chaining
Builder& MinSnapshotsToKeep(std::optional<int32_t> value);

/// \brief Set the maximum snapshot age in milliseconds (branch only)
/// \param value The maximum snapshot age in milliseconds, or nullopt for default
/// \return Reference to this builder for method chaining
Builder& MaxSnapshotAgeMs(std::optional<int64_t> value);

/// \brief Set the maximum reference age in milliseconds
/// \param value The maximum reference age in milliseconds, or nullopt for default
/// \return Reference to this builder for method chaining
Builder& MaxRefAgeMs(std::optional<int64_t> value);

/// \brief Build the SnapshotRef
/// \return A Result containing the SnapshotRef instance, or an error if validation
/// failed
Result<SnapshotRef> Build() const;

private:
explicit Builder(SnapshotRefType type, int64_t snapshot_id);

SnapshotRefType type_;
int64_t snapshot_id_;
std::optional<int32_t> min_snapshots_to_keep_;
std::optional<int64_t> max_snapshot_age_ms_;
std::optional<int64_t> max_ref_age_ms_;
};

private:
/// \brief Compare two snapshot refs for equality.
bool Equals(const SnapshotRef& other) const;
Expand Down Expand Up @@ -253,6 +315,29 @@ struct ICEBERG_EXPORT Snapshot {
/// unknown.
std::optional<std::string_view> operation() const;

/// \brief The row-id of the first newly added row in this snapshot.
///
/// All rows added in this snapshot will have a row-id assigned to them greater than
/// this value. All rows with a row-id less than this value were created in a snapshot
/// that was added to the table (but not necessarily committed to this branch) in the
/// past.
///
/// \return the first row-id to be used in this snapshot or nullopt when row lineage
/// is not supported
std::optional<int64_t> FirstRowId() const;

/// \brief The upper bound of number of rows with assigned row IDs in this snapshot.
///
/// It can be used safely to increment the table's `next-row-id` during a commit. It
/// can be more than the number of rows added in this snapshot and include some
/// existing rows.
///
/// This field is optional but is required when the table version supports row lineage.
///
/// \return the upper bound of number of rows with assigned row IDs in this snapshot
/// or nullopt if the value was not stored.
std::optional<int64_t> AddedRows() const;

/// \brief Compare two snapshots for equality.
friend bool operator==(const Snapshot& lhs, const Snapshot& rhs) {
return lhs.Equals(rhs);
Expand Down
20 changes: 10 additions & 10 deletions src/iceberg/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this<Table> {

virtual ~Table();

/// \brief Return the identifier of this table
/// \brief Returns the identifier of this table
const TableIdentifier& name() const { return identifier_; }

/// \brief Returns the UUID of the table
Expand All @@ -59,40 +59,40 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this<Table> {
/// \brief Return the schema for this table, return NotFoundError if not found
Result<std::shared_ptr<Schema>> schema() const;

/// \brief Return a map of schema for this table
/// \brief Returns a map of schema for this table
Result<
std::reference_wrapper<const std::unordered_map<int32_t, std::shared_ptr<Schema>>>>
schemas() const;

/// \brief Return the partition spec for this table, return NotFoundError if not found
/// \brief Returns the partition spec for this table, return NotFoundError if not found
Result<std::shared_ptr<PartitionSpec>> spec() const;

/// \brief Return a map of partition specs for this table
/// \brief Returns a map of partition specs for this table
Result<std::reference_wrapper<
const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>>>
specs() const;

/// \brief Return the sort order for this table, return NotFoundError if not found
/// \brief Returns the sort order for this table, return NotFoundError if not found
Result<std::shared_ptr<SortOrder>> sort_order() const;

/// \brief Return a map of sort order IDs to sort orders for this table
/// \brief Returns a map of sort order IDs to sort orders for this table
Result<std::reference_wrapper<
const std::unordered_map<int32_t, std::shared_ptr<SortOrder>>>>
sort_orders() const;

/// \brief Return a map of string properties for this table
/// \brief Returns the properties of this table
const TableProperties& properties() const;

/// \brief Return the table's metadata file location
/// \brief Returns the table's metadata file location
std::string_view metadata_file_location() const;

/// \brief Return the table's base location
/// \brief Returns the table's base location
std::string_view location() const;

/// \brief Returns the time when this table was last updated
TimePointMs last_updated_ms() const;

/// \brief Return the table's current snapshot, return NotFoundError if not found
/// \brief Returns the table's current snapshot, return NotFoundError if not found
Result<std::shared_ptr<Snapshot>> current_snapshot() const;

/// \brief Get the snapshot of this table with the given id
Expand Down
Loading
Loading