diff --git a/src/iceberg/catalog/rest/http_client.cc b/src/iceberg/catalog/rest/http_client.cc index d1138b787..7804ebd58 100644 --- a/src/iceberg/catalog/rest/http_client.cc +++ b/src/iceberg/catalog/rest/http_client.cc @@ -135,13 +135,12 @@ Status HandleFailureResponse(const cpr::Response& response, } // namespace void HttpClient::PrepareSession( - const std::string& path, - const std::unordered_map& request_headers, - const std::unordered_map& params) { + const std::string& path, const std::unordered_map& params, + const std::unordered_map& headers) { session_->SetUrl(cpr::Url{path}); session_->SetParameters(GetParameters(params)); session_->RemoveContent(); - auto final_headers = MergeHeaders(default_headers_, request_headers); + auto final_headers = MergeHeaders(default_headers_, headers); session_->SetHeader(final_headers); } @@ -164,7 +163,7 @@ Result HttpClient::Get( cpr::Response response; { std::lock_guard guard(session_mutex_); - PrepareSession(path, headers, params); + PrepareSession(path, params, headers); response = session_->Get(); } @@ -181,7 +180,7 @@ Result HttpClient::Post( cpr::Response response; { std::lock_guard guard(session_mutex_); - PrepareSession(path, headers); + PrepareSession(path, /*params=*/{}, headers); session_->SetBody(cpr::Body{body}); response = session_->Post(); } @@ -206,7 +205,7 @@ Result HttpClient::PostForm( auto form_headers = headers; form_headers[kHeaderContentType] = kMimeTypeFormUrlEncoded; - PrepareSession(path, form_headers); + PrepareSession(path, /*params=*/{}, form_headers); std::vector pair_list; pair_list.reserve(form_data.size()); for (const auto& [key, val] : form_data) { @@ -229,7 +228,7 @@ Result HttpClient::Head( cpr::Response response; { std::lock_guard guard(session_mutex_); - PrepareSession(path, headers); + PrepareSession(path, /*params=*/{}, headers); response = session_->Head(); } @@ -240,12 +239,13 @@ Result HttpClient::Head( } Result HttpClient::Delete( - const std::string& path, const std::unordered_map& headers, + const std::string& path, const std::unordered_map& params, + const std::unordered_map& headers, const ErrorHandler& error_handler) { cpr::Response response; { std::lock_guard guard(session_mutex_); - PrepareSession(path, headers); + PrepareSession(path, params, headers); response = session_->Delete(); } diff --git a/src/iceberg/catalog/rest/http_client.h b/src/iceberg/catalog/rest/http_client.h index 56c9f2902..a1401b631 100644 --- a/src/iceberg/catalog/rest/http_client.h +++ b/src/iceberg/catalog/rest/http_client.h @@ -104,13 +104,14 @@ class ICEBERG_REST_EXPORT HttpClient { /// \brief Sends a DELETE request. Result Delete(const std::string& path, + const std::unordered_map& params, const std::unordered_map& headers, const ErrorHandler& error_handler); private: void PrepareSession(const std::string& path, - const std::unordered_map& request_headers, - const std::unordered_map& params = {}); + const std::unordered_map& params, + const std::unordered_map& headers); std::unordered_map default_headers_; diff --git a/src/iceberg/catalog/rest/rest_catalog.cc b/src/iceberg/catalog/rest/rest_catalog.cc index eeffffd26..2c28691b1 100644 --- a/src/iceberg/catalog/rest/rest_catalog.cc +++ b/src/iceberg/catalog/rest/rest_catalog.cc @@ -74,6 +74,33 @@ Result FetchServerConfig(const ResourcePaths& paths, return CatalogConfigFromJson(json); } +#define ICEBERG_ENDPOINT_CHECK(endpoints, endpoint) \ + do { \ + if (!endpoints.contains(endpoint)) { \ + return NotSupported("Not supported endpoint: {}", endpoint.ToString()); \ + } \ + } while (0) + +Result CaptureNoSuchObject(const auto& status, ErrorKind kind) { + ICEBERG_DCHECK(kind == ErrorKind::kNoSuchTable || kind == ErrorKind::kNoSuchNamespace, + "Invalid kind for CaptureNoSuchObject"); + if (status.has_value()) { + return true; + } + if (status.error().kind == kind) { + return false; + } + return std::unexpected(status.error()); +} + +Result CaptureNoSuchTable(const auto& status) { + return CaptureNoSuchObject(status, ErrorKind::kNoSuchTable); +} + +Result CaptureNoSuchNamespace(const auto& status) { + return CaptureNoSuchObject(status, ErrorKind::kNoSuchNamespace); +} + } // namespace RestCatalog::~RestCatalog() = default; @@ -126,9 +153,7 @@ RestCatalog::RestCatalog(std::unique_ptr config, std::string_view RestCatalog::name() const { return name_; } Result> RestCatalog::ListNamespaces(const Namespace& ns) const { - ICEBERG_RETURN_UNEXPECTED( - CheckEndpoint(supported_endpoints_, Endpoint::ListNamespaces())); - + ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::ListNamespaces()); ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Namespaces()); std::vector result; std::string next_token; @@ -157,9 +182,7 @@ Result> RestCatalog::ListNamespaces(const Namespace& ns) Status RestCatalog::CreateNamespace( const Namespace& ns, const std::unordered_map& properties) { - ICEBERG_RETURN_UNEXPECTED( - CheckEndpoint(supported_endpoints_, Endpoint::CreateNamespace())); - + ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::CreateNamespace()); ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Namespaces()); CreateNamespaceRequest request{.namespace_ = ns, .properties = properties}; ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(ToJson(request))); @@ -173,9 +196,7 @@ Status RestCatalog::CreateNamespace( Result> RestCatalog::GetNamespaceProperties( const Namespace& ns) const { - ICEBERG_RETURN_UNEXPECTED( - CheckEndpoint(supported_endpoints_, Endpoint::GetNamespaceProperties())); - + ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::GetNamespaceProperties()); ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Namespace_(ns)); ICEBERG_ASSIGN_OR_RAISE(const auto response, client_->Get(path, /*params=*/{}, /*headers=*/{}, @@ -186,48 +207,29 @@ Result> RestCatalog::GetNamespacePr } Status RestCatalog::DropNamespace(const Namespace& ns) { - ICEBERG_RETURN_UNEXPECTED( - CheckEndpoint(supported_endpoints_, Endpoint::DropNamespace())); + ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::DropNamespace()); ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Namespace_(ns)); - ICEBERG_ASSIGN_OR_RAISE( - const auto response, - client_->Delete(path, /*headers=*/{}, *DropNamespaceErrorHandler::Instance())); + ICEBERG_ASSIGN_OR_RAISE(const auto response, + client_->Delete(path, /*params=*/{}, /*headers=*/{}, + *DropNamespaceErrorHandler::Instance())); return {}; } Result RestCatalog::NamespaceExists(const Namespace& ns) const { - auto check = CheckEndpoint(supported_endpoints_, Endpoint::NamespaceExists()); - if (!check.has_value()) { + if (!supported_endpoints_.contains(Endpoint::NamespaceExists())) { // Fall back to GetNamespaceProperties - auto result = GetNamespaceProperties(ns); - if (!result.has_value() && result.error().kind == ErrorKind::kNoSuchNamespace) { - return false; - } - ICEBERG_RETURN_UNEXPECTED(result); - // GET succeeded, namespace exists - return true; + return CaptureNoSuchNamespace(GetNamespaceProperties(ns)); } ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Namespace_(ns)); - auto response_or_error = - client_->Head(path, /*headers=*/{}, *NamespaceErrorHandler::Instance()); - if (!response_or_error.has_value()) { - const auto& error = response_or_error.error(); - // catch NoSuchNamespaceException/404 and return false - if (error.kind == ErrorKind::kNoSuchNamespace) { - return false; - } - ICEBERG_RETURN_UNEXPECTED(response_or_error); - } - return true; + return CaptureNoSuchNamespace( + client_->Head(path, /*headers=*/{}, *NamespaceErrorHandler::Instance())); } Status RestCatalog::UpdateNamespaceProperties( const Namespace& ns, const std::unordered_map& updates, const std::unordered_set& removals) { - ICEBERG_RETURN_UNEXPECTED( - CheckEndpoint(supported_endpoints_, Endpoint::UpdateNamespace())); - + ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::UpdateNamespace()); ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->NamespaceProperties(ns)); UpdateNamespacePropertiesRequest request{ .removals = std::vector(removals.begin(), removals.end()), @@ -252,7 +254,7 @@ Result> RestCatalog::CreateTable( const std::shared_ptr& spec, const std::shared_ptr& order, const std::string& location, const std::unordered_map& properties) { - ICEBERG_RETURN_UNEXPECTED(CheckEndpoint(supported_endpoints_, Endpoint::CreateTable())); + ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::CreateTable()); ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Tables(identifier.ns)); CreateTableRequest request{ @@ -294,14 +296,29 @@ Result> RestCatalog::StageCreateTable( return NotImplemented("Not implemented"); } -Status RestCatalog::DropTable([[maybe_unused]] const TableIdentifier& identifier, - [[maybe_unused]] bool purge) { - return NotImplemented("Not implemented"); +Status RestCatalog::DropTable(const TableIdentifier& identifier, bool purge) { + ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::DeleteTable()); + ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Table(identifier)); + + std::unordered_map params; + if (purge) { + params["purgeRequested"] = "true"; + } + ICEBERG_ASSIGN_OR_RAISE( + const auto response, + client_->Delete(path, params, /*headers=*/{}, *TableErrorHandler::Instance())); + return {}; } -Result RestCatalog::TableExists( - [[maybe_unused]] const TableIdentifier& identifier) const { - return NotImplemented("Not implemented"); +Result RestCatalog::TableExists(const TableIdentifier& identifier) const { + if (!supported_endpoints_.contains(Endpoint::TableExists())) { + // Fall back to call LoadTable + return CaptureNoSuchTable(LoadTableInternal(identifier)); + } + + ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Table(identifier)); + return CaptureNoSuchTable( + client_->Head(path, /*headers=*/{}, *TableErrorHandler::Instance())); } Status RestCatalog::RenameTable([[maybe_unused]] const TableIdentifier& from, @@ -309,9 +326,27 @@ Status RestCatalog::RenameTable([[maybe_unused]] const TableIdentifier& from, return NotImplemented("Not implemented"); } -Result> RestCatalog::LoadTable( - [[maybe_unused]] const TableIdentifier& identifier) { - return NotImplemented("Not implemented"); +Result RestCatalog::LoadTableInternal( + const TableIdentifier& identifier) const { + ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::LoadTable()); + ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Table(identifier)); + ICEBERG_ASSIGN_OR_RAISE( + const auto response, + client_->Get(path, /*params=*/{}, /*headers=*/{}, *TableErrorHandler::Instance())); + return response.body(); +} + +Result> RestCatalog::LoadTable(const TableIdentifier& identifier) { + ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::LoadTable()); + ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Table(identifier)); + + ICEBERG_ASSIGN_OR_RAISE(const auto body, LoadTableInternal(identifier)); + ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(body)); + ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json)); + + return Table::Make(identifier, std::move(load_result.metadata), + std::move(load_result.metadata_location), file_io_, + shared_from_this()); } Result> RestCatalog::RegisterTable( diff --git a/src/iceberg/catalog/rest/rest_catalog.h b/src/iceberg/catalog/rest/rest_catalog.h index a80965211..41928cf7b 100644 --- a/src/iceberg/catalog/rest/rest_catalog.h +++ b/src/iceberg/catalog/rest/rest_catalog.h @@ -108,6 +108,8 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog, std::shared_ptr file_io, std::unique_ptr paths, std::unordered_set endpoints); + Result LoadTableInternal(const TableIdentifier& identifier) const; + std::unique_ptr config_; std::shared_ptr file_io_; std::unique_ptr client_; diff --git a/src/iceberg/catalog/rest/rest_util.cc b/src/iceberg/catalog/rest/rest_util.cc index 62b9bfc33..18b94195c 100644 --- a/src/iceberg/catalog/rest/rest_util.cc +++ b/src/iceberg/catalog/rest/rest_util.cc @@ -253,12 +253,4 @@ std::string GetStandardReasonPhrase(int32_t status_code) { } } -Status CheckEndpoint(const std::unordered_set& supported_endpoints, - const Endpoint& endpoint) { - if (!supported_endpoints.contains(endpoint)) { - return NotSupported("Server does not support endpoint: {}", endpoint.ToString()); - } - return {}; -} - } // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/rest_util.h b/src/iceberg/catalog/rest/rest_util.h index 5734bbc72..ffc941ea5 100644 --- a/src/iceberg/catalog/rest/rest_util.h +++ b/src/iceberg/catalog/rest/rest_util.h @@ -93,12 +93,4 @@ ICEBERG_REST_EXPORT std::unordered_map MergeConfigs( /// Error"). ICEBERG_REST_EXPORT std::string GetStandardReasonPhrase(int32_t status_code); -/// \brief Check whether the given endpoint is in the set of supported endpoints. -/// -/// \param supported_endpoints Set of endpoints advertised by the server -/// \param endpoint Endpoint to validate -/// \return Status::OK if supported, NotSupported error otherwise -ICEBERG_REST_EXPORT Status CheckEndpoint( - const std::unordered_set& supported_endpoints, const Endpoint& endpoint); - } // namespace iceberg::rest diff --git a/src/iceberg/test/rest_catalog_test.cc b/src/iceberg/test/rest_catalog_test.cc index fb1f70610..529690402 100644 --- a/src/iceberg/test/rest_catalog_test.cc +++ b/src/iceberg/test/rest_catalog_test.cc @@ -136,6 +136,14 @@ class RestCatalogIntegrationTest : public ::testing::Test { return RestCatalog::Make(*config, std::make_shared()); } + // Helper function to create a default schema for testing + std::shared_ptr CreateDefaultSchema() { + return std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeOptional(2, "data", string())}, + /*schema_id=*/1); + } + static inline std::unique_ptr docker_compose_; }; @@ -288,7 +296,7 @@ TEST_F(RestCatalogIntegrationTest, NamespaceExists) { // Check it now exists exists_result = catalog->NamespaceExists(ns); ASSERT_THAT(exists_result, IsOk()); - EXPECT_TRUE(*exists_result); + EXPECT_TRUE(exists_result.value()); } TEST_F(RestCatalogIntegrationTest, UpdateNamespaceProperties) { @@ -340,7 +348,19 @@ TEST_F(RestCatalogIntegrationTest, DropNamespace) { // Verify it no longer exists exists_result = catalog->NamespaceExists(ns); ASSERT_THAT(exists_result, IsOk()); - EXPECT_FALSE(*exists_result); + EXPECT_FALSE(exists_result.value()); +} + +TEST_F(RestCatalogIntegrationTest, DropNonExistentNamespace) { + auto catalog_result = CreateCatalog(); + ASSERT_THAT(catalog_result, IsOk()); + auto& catalog = catalog_result.value(); + + Namespace ns{.levels = {"nonexistent_namespace"}}; + auto status = catalog->DropNamespace(ns); + + // Should return NoSuchNamespace error + EXPECT_THAT(status, IsError(ErrorKind::kNoSuchNamespace)); } TEST_F(RestCatalogIntegrationTest, CreateTable) { @@ -362,22 +382,9 @@ TEST_F(RestCatalogIntegrationTest, CreateTable) { status = catalog->CreateNamespace(ns, ns_properties); EXPECT_THAT(status, IsOk()); - // Create schema - auto schema = std::make_shared( - std::vector{SchemaField::MakeOptional(1, "foo", string()), - SchemaField::MakeRequired(2, "bar", int32()), - SchemaField::MakeOptional(3, "baz", boolean())}, - /*schema_id=*/1); - - // Create partition spec and sort order (unpartitioned and unsorted) - auto partition_spec_result = PartitionSpec::Make(PartitionSpec::kInitialSpecId, {}, 0); - ASSERT_THAT(partition_spec_result, IsOk()); - auto partition_spec = std::shared_ptr(std::move(*partition_spec_result)); - - auto sort_order_result = - SortOrder::Make(SortOrder::kUnsortedOrderId, std::vector{}); - ASSERT_THAT(sort_order_result, IsOk()); - auto sort_order = std::shared_ptr(std::move(*sort_order_result)); + auto schema = CreateDefaultSchema(); + auto partition_spec = PartitionSpec::Unpartitioned(); + auto sort_order = SortOrder::Unsorted(); // Create table TableIdentifier table_id{.ns = ns, .name = "t1"}; @@ -400,4 +407,81 @@ TEST_F(RestCatalogIntegrationTest, CreateTable) { HasErrorMessage("Table already exists: test_create_table.apple.ios.t1")); } +TEST_F(RestCatalogIntegrationTest, LoadTable) { + auto catalog_result = CreateCatalog(); + ASSERT_THAT(catalog_result, IsOk()); + auto& catalog = catalog_result.value(); + + // Create namespace and table first + Namespace ns{.levels = {"test_load_table"}}; + auto status = catalog->CreateNamespace(ns, {}); + EXPECT_THAT(status, IsOk()); + + // Create schema, partition spec, and sort order using helper functions + auto schema = CreateDefaultSchema(); + auto partition_spec = PartitionSpec::Unpartitioned(); + auto sort_order = SortOrder::Unsorted(); + + // Create table + TableIdentifier table_id{.ns = ns, .name = "test_table"}; + std::unordered_map table_properties{{"key1", "value1"}}; + auto create_result = catalog->CreateTable(table_id, schema, partition_spec, sort_order, + "", table_properties); + ASSERT_THAT(create_result, IsOk()); + + // Load the table + auto load_result = catalog->LoadTable(table_id); + ASSERT_THAT(load_result, IsOk()); + auto& loaded_table = load_result.value(); + + // Verify loaded table properties + EXPECT_EQ(loaded_table->name().ns.levels, std::vector{"test_load_table"}); + EXPECT_EQ(loaded_table->name().name, "test_table"); + EXPECT_NE(loaded_table->metadata(), nullptr); + + // Verify schema + auto loaded_schema_result = loaded_table->schema(); + ASSERT_THAT(loaded_schema_result, IsOk()); + auto loaded_schema = loaded_schema_result.value(); + EXPECT_EQ(loaded_schema->fields().size(), 2); + EXPECT_EQ(loaded_schema->fields()[0].name(), "id"); + EXPECT_EQ(loaded_schema->fields()[1].name(), "data"); +} + +TEST_F(RestCatalogIntegrationTest, DropTable) { + auto catalog_result = CreateCatalog(); + ASSERT_THAT(catalog_result, IsOk()); + auto& catalog = catalog_result.value(); + + // Create namespace and table first + Namespace ns{.levels = {"test_drop_table"}}; + auto status = catalog->CreateNamespace(ns, {}); + EXPECT_THAT(status, IsOk()); + + // Create table + auto schema = CreateDefaultSchema(); + auto partition_spec = PartitionSpec::Unpartitioned(); + auto sort_order = SortOrder::Unsorted(); + + TableIdentifier table_id{.ns = ns, .name = "table_to_drop"}; + std::unordered_map table_properties; + auto create_result = catalog->CreateTable(table_id, schema, partition_spec, sort_order, + "", table_properties); + ASSERT_THAT(create_result, IsOk()); + + // Verify table exists + auto load_result = catalog->TableExists(table_id); + ASSERT_THAT(load_result, IsOk()); + EXPECT_TRUE(load_result.value()); + + // Drop the table + status = catalog->DropTable(table_id, /*purge=*/false); + ASSERT_THAT(status, IsOk()); + + // Verify table no longer exists + load_result = catalog->TableExists(table_id); + ASSERT_THAT(load_result, IsOk()); + EXPECT_FALSE(load_result.value()); +} + } // namespace iceberg::rest diff --git a/src/iceberg/test/rest_util_test.cc b/src/iceberg/test/rest_util_test.cc index 5d2abf558..e11f00154 100644 --- a/src/iceberg/test/rest_util_test.cc +++ b/src/iceberg/test/rest_util_test.cc @@ -154,20 +154,4 @@ TEST(RestUtilTest, MergeConfigs) { EXPECT_EQ(merged_empty["key"], "value"); } -TEST(RestUtilTest, CheckEndpointSupported) { - std::unordered_set supported = { - Endpoint::ListNamespaces(), Endpoint::LoadTable(), Endpoint::CreateTable()}; - - // Supported endpoints should pass - EXPECT_THAT(CheckEndpoint(supported, Endpoint::ListNamespaces()), IsOk()); - EXPECT_THAT(CheckEndpoint(supported, Endpoint::LoadTable()), IsOk()); - EXPECT_THAT(CheckEndpoint(supported, Endpoint::CreateTable()), IsOk()); - - // Unsupported endpoints should fail - EXPECT_THAT(CheckEndpoint(supported, Endpoint::DeleteTable()), - IsError(ErrorKind::kNotSupported)); - EXPECT_THAT(CheckEndpoint(supported, Endpoint::UpdateTable()), - IsError(ErrorKind::kNotSupported)); -} - } // namespace iceberg::rest