diff --git a/velox/connectors/lakehouse/iceberg/CMakeLists.txt b/velox/connectors/lakehouse/iceberg/CMakeLists.txt index 1030ccd8e231..8d976688b467 100644 --- a/velox/connectors/lakehouse/iceberg/CMakeLists.txt +++ b/velox/connectors/lakehouse/iceberg/CMakeLists.txt @@ -21,6 +21,7 @@ velox_add_library( IcebergSplitReader.cpp IcebergSplit.cpp IcebergTableHandle.cpp + PartitionSpec.cpp PositionalDeleteFileReader.cpp IcebergDataSink.cpp) diff --git a/velox/connectors/lakehouse/iceberg/PartitionSpec.cpp b/velox/connectors/lakehouse/iceberg/PartitionSpec.cpp new file mode 100644 index 000000000000..c81979971dda --- /dev/null +++ b/velox/connectors/lakehouse/iceberg/PartitionSpec.cpp @@ -0,0 +1,161 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/lakehouse/iceberg/PartitionSpec.h" + +namespace facebook::velox::connector::lakehouse::iceberg { + +namespace { +std::string transformTypeToString(TransformType type) { + switch (type) { + case TransformType::kIdentity: + return "identity"; + case TransformType::kHour: + return "hour"; + case TransformType::kDay: + return "day"; + case TransformType::kMonth: + return "month"; + case TransformType::kYear: + return "year"; + case TransformType::kBucket: + return "bucket"; + case TransformType::kTruncate: + return "truncate"; + } + VELOX_UNREACHABLE("Unknown TransformType"); +} + +TransformType transformTypeFromString(const std::string& str) { + if (str == "identity") { + return TransformType::kIdentity; + } else if (str == "hour") { + return TransformType::kHour; + } else if (str == "day") { + return TransformType::kDay; + } else if (str == "month") { + return TransformType::kMonth; + } else if (str == "year") { + return TransformType::kYear; + } else if (str == "bucket") { + return TransformType::kBucket; + } else if (str == "truncate") { + return TransformType::kTruncate; + } else { + VELOX_USER_FAIL("Unknown TransformType: {}", str); + } +} +} // anonymous namespace + +folly::dynamic IcebergPartitionSpec::Field::serialize() const { + folly::dynamic obj = folly::dynamic::object; + obj["name"] = "Field"; + obj["fieldName"] = name; + obj["transformType"] = transformTypeToString(transformType); + if (parameter.has_value()) { + obj["parameter"] = parameter.value(); + } else { + obj["parameter"] = nullptr; + } + return obj; +} + +std::shared_ptr IcebergPartitionSpec::Field::create( + const folly::dynamic& obj, + void* context) { + VELOX_CHECK(obj.isObject(), "Field::create expects object"); + + const auto* fieldNamePtr = obj.get_ptr("fieldName"); + VELOX_CHECK(fieldNamePtr, "Field::create: missing 'fieldName'"); + VELOX_CHECK( + fieldNamePtr->isString(), "Field::create: 'fieldName' must be string"); + auto fieldName = fieldNamePtr->asString(); + + const auto* transformTypePtr = obj.get_ptr("transformType"); + VELOX_CHECK(transformTypePtr, "Field::create: missing 'transformType'"); + VELOX_CHECK( + transformTypePtr->isString(), + "Field::create: 'transformType' must be string"); + auto transformType = transformTypeFromString(transformTypePtr->asString()); + + std::optional parameter = std::nullopt; + const auto* parameterPtr = obj.get_ptr("parameter"); + if (parameterPtr && !parameterPtr->isNull()) { + VELOX_CHECK( + parameterPtr->isInt(), + "Field::create: 'parameter' must be integer if present"); + parameter = static_cast(parameterPtr->asInt()); + } + + return std::make_shared(fieldName, transformType, parameter); +} + +void IcebergPartitionSpec::Field::registerSerDe() { + auto& registry = DeserializationWithContextRegistryForSharedPtr(); + registry.Register("Field", Field::create); +} + +folly::dynamic IcebergPartitionSpec::serialize() const { + folly::dynamic obj = folly::dynamic::object; + obj["name"] = "IcebergPartitionSpec"; + obj["specId"] = specId; + + folly::dynamic fieldsArray = folly::dynamic::array; + fieldsArray.reserve(fields.size()); + for (const auto& field : fields) { + fieldsArray.push_back(field.serialize()); + } + obj["fields"] = std::move(fieldsArray); + return obj; +} + +std::shared_ptr IcebergPartitionSpec::create( + const folly::dynamic& obj, + void* context) { + VELOX_CHECK(obj.isObject(), "IcebergPartitionSpec::create expects object"); + + const auto* specIdPtr = obj.get_ptr("specId"); + VELOX_CHECK(specIdPtr, "IcebergPartitionSpec::create: missing 'specId'"); + VELOX_CHECK( + specIdPtr->isInt(), + "IcebergPartitionSpec::create: 'specId' must be integer"); + auto specId = static_cast(specIdPtr->asInt()); + + const auto* fieldsPtr = obj.get_ptr("fields"); + VELOX_CHECK(fieldsPtr, "IcebergPartitionSpec::create: missing 'fields'"); + VELOX_CHECK( + fieldsPtr->isArray(), + "IcebergPartitionSpec::create: 'fields' must be array"); + + std::vector deserializedFields; + deserializedFields.reserve(fieldsPtr->size()); + for (const auto& fieldObj : *fieldsPtr) { + auto fieldPtr = Field::create(fieldObj, context); + auto field = std::static_pointer_cast(fieldPtr); + deserializedFields.push_back(*field); + } + + return std::make_shared( + specId, std::move(deserializedFields)); +} + +void IcebergPartitionSpec::registerSerDe() { + Field::registerSerDe(); + auto& registry = DeserializationWithContextRegistryForSharedPtr(); + registry.Register("IcebergPartitionSpec", IcebergPartitionSpec::create); +} + +} // namespace facebook::velox::connector::lakehouse::iceberg diff --git a/velox/connectors/lakehouse/iceberg/PartitionSpec.h b/velox/connectors/lakehouse/iceberg/PartitionSpec.h index c4c89030d213..d2b0727f8d31 100644 --- a/velox/connectors/lakehouse/iceberg/PartitionSpec.h +++ b/velox/connectors/lakehouse/iceberg/PartitionSpec.h @@ -19,6 +19,7 @@ #include #include #include +#include "velox/common/serialization/Serializable.h" namespace facebook::velox::connector::lakehouse::iceberg { @@ -32,8 +33,8 @@ enum class TransformType { kTruncate }; -struct IcebergPartitionSpec { - struct Field { +struct IcebergPartitionSpec : public ISerializable { + struct Field : public ISerializable { // The column name and type of this partition field as it appears in the // partition spec. The column can be a nested column in struct field. std::string name; @@ -51,6 +52,14 @@ struct IcebergPartitionSpec { TransformType _transform, std::optional _parameter) : name(_name), transformType(_transform), parameter(_parameter) {} + + folly::dynamic serialize() const override; + + static std::shared_ptr create( + const folly::dynamic& obj, + void* context); + + static void registerSerDe(); }; const int32_t specId; @@ -58,6 +67,14 @@ struct IcebergPartitionSpec { IcebergPartitionSpec(int32_t _specId, const std::vector& _fields) : specId(_specId), fields(_fields) {} + + folly::dynamic serialize() const override; + + static std::shared_ptr create( + const folly::dynamic& obj, + void* context); + + static void registerSerDe(); }; } // namespace facebook::velox::connector::lakehouse::iceberg diff --git a/velox/connectors/lakehouse/iceberg/tests/CMakeLists.txt b/velox/connectors/lakehouse/iceberg/tests/CMakeLists.txt index bacba1f870ca..59ea183c8075 100644 --- a/velox/connectors/lakehouse/iceberg/tests/CMakeLists.txt +++ b/velox/connectors/lakehouse/iceberg/tests/CMakeLists.txt @@ -46,6 +46,7 @@ if(VELOX_BUILD_TESTING AND (NOT VELOX_DISABLE_GOOGLETEST)) IcebergSplitReaderBenchmarkTest.cpp IcebergTableHandleTest.cpp IcebergTestBase.cpp + PartitionSpecTest.cpp Main.cpp) add_test(velox_lakehouse_iceberg_test velox_lakehouse_iceberg_test) diff --git a/velox/connectors/lakehouse/iceberg/tests/PartitionSpecTest.cpp b/velox/connectors/lakehouse/iceberg/tests/PartitionSpecTest.cpp new file mode 100644 index 000000000000..933fd571bf51 --- /dev/null +++ b/velox/connectors/lakehouse/iceberg/tests/PartitionSpecTest.cpp @@ -0,0 +1,189 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "velox/connectors/lakehouse/iceberg/PartitionSpec.h" +#include "velox/connectors/lakehouse/iceberg/tests/IcebergTestBase.h" + +namespace facebook::velox::connector::lakehouse::iceberg::test { + +class PartitionSpecTest : public ::testing::Test { + public: + void SetUp() override { + // Initialize memory pools. + if (!memory::MemoryManager::testInstance()) { + memory::initializeMemoryManager(memory::MemoryManager::Options{}); + } + rootPool_ = memory::memoryManager()->addRootPool("PartitionSpecTest"); + pool_ = rootPool_->addLeafChild("leaf"); + + // Register SerDe for PartitionSpec + IcebergPartitionSpec::registerSerDe(); + } + + void TearDown() override { + pool_.reset(); + rootPool_.reset(); + } + + memory::MemoryPool* pool() { + return pool_.get(); + } + + private: + std::shared_ptr rootPool_; + std::shared_ptr pool_; +}; + +TEST_F(PartitionSpecTest, fieldSerializeRoundTrip) { + // Test Field with parameter + IcebergPartitionSpec::Field fieldWithParam( + "bucket_column", TransformType::kBucket, 10); + auto serialized = fieldWithParam.serialize(); + + EXPECT_EQ(serialized["name"].asString(), "Field"); + EXPECT_EQ(serialized["fieldName"].asString(), "bucket_column"); + EXPECT_EQ(serialized["transformType"].asString(), "bucket"); + EXPECT_EQ(serialized["parameter"].asInt(), 10); + + auto deserialized = IcebergPartitionSpec::Field::create(serialized, pool()); + auto field = + std::static_pointer_cast(deserialized); + EXPECT_EQ(field->name, "bucket_column"); + EXPECT_EQ(field->transformType, TransformType::kBucket); + EXPECT_TRUE(field->parameter.has_value()); + EXPECT_EQ(field->parameter.value(), 10); + + // Test Field without parameter + IcebergPartitionSpec::Field fieldWithoutParam( + "identity_column", TransformType::kIdentity, std::nullopt); + auto serialized2 = fieldWithoutParam.serialize(); + + EXPECT_EQ(serialized2["name"].asString(), "Field"); + EXPECT_EQ(serialized2["fieldName"].asString(), "identity_column"); + EXPECT_EQ(serialized2["transformType"].asString(), "identity"); + EXPECT_TRUE(serialized2["parameter"].isNull()); + + auto deserialized2 = IcebergPartitionSpec::Field::create(serialized2, pool()); + auto field2 = std::static_pointer_cast( + deserialized2); + EXPECT_EQ(field2->name, "identity_column"); + EXPECT_EQ(field2->transformType, TransformType::kIdentity); + EXPECT_FALSE(field2->parameter.has_value()); +} + +TEST_F(PartitionSpecTest, partitionSpecSerializeRoundTrip) { + // Create a partition spec with multiple fields + std::vector fields; + fields.emplace_back("date_column", TransformType::kDay, std::nullopt); + fields.emplace_back("bucket_column", TransformType::kBucket, 16); + fields.emplace_back("truncate_column", TransformType::kTruncate, 10); + fields.emplace_back( + "identity_column", TransformType::kIdentity, std::nullopt); + + IcebergPartitionSpec spec(42, fields); + + // Serialize + auto serialized = spec.serialize(); + EXPECT_EQ(serialized["name"].asString(), "IcebergPartitionSpec"); + EXPECT_EQ(serialized["specId"].asInt(), 42); + EXPECT_TRUE(serialized["fields"].isArray()); + EXPECT_EQ(serialized["fields"].size(), 4); + + // Verify individual fields in serialized output + auto& serializedFields = serialized["fields"]; + EXPECT_EQ(serializedFields[0]["fieldName"].asString(), "date_column"); + EXPECT_EQ(serializedFields[0]["transformType"].asString(), "day"); + EXPECT_TRUE(serializedFields[0]["parameter"].isNull()); + + EXPECT_EQ(serializedFields[1]["fieldName"].asString(), "bucket_column"); + EXPECT_EQ(serializedFields[1]["transformType"].asString(), "bucket"); + EXPECT_EQ(serializedFields[1]["parameter"].asInt(), 16); + + EXPECT_EQ(serializedFields[2]["fieldName"].asString(), "truncate_column"); + EXPECT_EQ(serializedFields[2]["transformType"].asString(), "truncate"); + EXPECT_EQ(serializedFields[2]["parameter"].asInt(), 10); + + EXPECT_EQ(serializedFields[3]["fieldName"].asString(), "identity_column"); + EXPECT_EQ(serializedFields[3]["transformType"].asString(), "identity"); + EXPECT_TRUE(serializedFields[3]["parameter"].isNull()); + + // Deserialize + auto deserialized = IcebergPartitionSpec::create(serialized, pool()); + auto deserializedSpec = + std::static_pointer_cast(deserialized); + EXPECT_EQ(deserializedSpec->specId, 42); + EXPECT_EQ(deserializedSpec->fields.size(), 4); + + // Verify deserialized fields + const auto& deserializedFields = deserializedSpec->fields; + + EXPECT_EQ(deserializedFields[0].name, "date_column"); + EXPECT_EQ(deserializedFields[0].transformType, TransformType::kDay); + EXPECT_FALSE(deserializedFields[0].parameter.has_value()); + + EXPECT_EQ(deserializedFields[1].name, "bucket_column"); + EXPECT_EQ(deserializedFields[1].transformType, TransformType::kBucket); + EXPECT_TRUE(deserializedFields[1].parameter.has_value()); + EXPECT_EQ(deserializedFields[1].parameter.value(), 16); + + EXPECT_EQ(deserializedFields[2].name, "truncate_column"); + EXPECT_EQ(deserializedFields[2].transformType, TransformType::kTruncate); + EXPECT_TRUE(deserializedFields[2].parameter.has_value()); + EXPECT_EQ(deserializedFields[2].parameter.value(), 10); + + EXPECT_EQ(deserializedFields[3].name, "identity_column"); + EXPECT_EQ(deserializedFields[3].transformType, TransformType::kIdentity); + EXPECT_FALSE(deserializedFields[3].parameter.has_value()); +} + +TEST_F(PartitionSpecTest, emptyPartitionSpec) { + // Test with empty fields + std::vector emptyFields; + IcebergPartitionSpec emptySpec(0, emptyFields); + + auto serialized = emptySpec.serialize(); + EXPECT_EQ(serialized["name"].asString(), "IcebergPartitionSpec"); + EXPECT_EQ(serialized["specId"].asInt(), 0); + EXPECT_TRUE(serialized["fields"].isArray()); + EXPECT_EQ(serialized["fields"].size(), 0); + + auto deserialized = IcebergPartitionSpec::create(serialized, pool()); + auto spec = + std::static_pointer_cast(deserialized); + EXPECT_EQ(spec->specId, 0); + EXPECT_EQ(spec->fields.size(), 0); +} + +TEST_F(PartitionSpecTest, serializationErrors) { + // Test invalid Field creation + folly::dynamic invalidField = folly::dynamic::object; + invalidField["name"] = "Field"; + // Missing fieldName should cause error + EXPECT_THROW( + IcebergPartitionSpec::Field::create(invalidField, pool()), + VeloxException); + + // Test invalid PartitionSpec creation + folly::dynamic invalidSpec = folly::dynamic::object; + invalidSpec["name"] = "IcebergPartitionSpec"; + // Missing specId should cause error + EXPECT_THROW( + IcebergPartitionSpec::create(invalidSpec, pool()), VeloxException); +} + +} // namespace facebook::velox::connector::lakehouse::iceberg::test