WARNING: THIS SITE IS A MIRROR OF GITHUB.COM / IT CANNOT LOGIN OR REGISTER ACCOUNTS / THE CONTENTS ARE PROVIDED AS-IS / THIS SITE ASSUMES NO RESPONSIBILITY FOR ANY DISPLAYED CONTENT OR LINKS / IF YOU FOUND SOMETHING MAY NOT GOOD FOR EVERYONE, CONTACT ADMIN AT ilovescratch@foxmail.com
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions src/iceberg/catalog/rest/http_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,12 @@ Status HandleFailureResponse(const cpr::Response& response,
} // namespace

void HttpClient::PrepareSession(
const std::string& path,
const std::unordered_map<std::string, std::string>& request_headers,
const std::unordered_map<std::string, std::string>& params) {
const std::string& path, const std::unordered_map<std::string, std::string>& params,
const std::unordered_map<std::string, std::string>& headers) {
session_->SetUrl(cpr::Url{path});
session_->SetParameters(GetParameters(params));
session_->RemoveContent();
auto final_headers = MergeHeaders(default_headers_, request_headers);
auto final_headers = MergeHeaders(default_headers_, headers);
session_->SetHeader(final_headers);
}

Expand All @@ -164,7 +163,7 @@ Result<HttpResponse> HttpClient::Get(
cpr::Response response;
{
std::lock_guard guard(session_mutex_);
PrepareSession(path, headers, params);
PrepareSession(path, params, headers);
response = session_->Get();
}

Expand All @@ -181,7 +180,7 @@ Result<HttpResponse> HttpClient::Post(
cpr::Response response;
{
std::lock_guard guard(session_mutex_);
PrepareSession(path, headers);
PrepareSession(path, /*params=*/{}, headers);
session_->SetBody(cpr::Body{body});
response = session_->Post();
}
Expand All @@ -206,7 +205,7 @@ Result<HttpResponse> HttpClient::PostForm(
auto form_headers = headers;
form_headers[kHeaderContentType] = kMimeTypeFormUrlEncoded;

PrepareSession(path, form_headers);
PrepareSession(path, /*params=*/{}, form_headers);
std::vector<cpr::Pair> pair_list;
pair_list.reserve(form_data.size());
for (const auto& [key, val] : form_data) {
Expand All @@ -229,7 +228,7 @@ Result<HttpResponse> HttpClient::Head(
cpr::Response response;
{
std::lock_guard guard(session_mutex_);
PrepareSession(path, headers);
PrepareSession(path, /*params=*/{}, headers);
response = session_->Head();
}

Expand All @@ -240,12 +239,13 @@ Result<HttpResponse> HttpClient::Head(
}

Result<HttpResponse> HttpClient::Delete(
const std::string& path, const std::unordered_map<std::string, std::string>& headers,
const std::string& path, const std::unordered_map<std::string, std::string>& params,
const std::unordered_map<std::string, std::string>& headers,
const ErrorHandler& error_handler) {
cpr::Response response;
{
std::lock_guard guard(session_mutex_);
PrepareSession(path, headers);
PrepareSession(path, params, headers);
response = session_->Delete();
}

Expand Down
5 changes: 3 additions & 2 deletions src/iceberg/catalog/rest/http_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,14 @@ class ICEBERG_REST_EXPORT HttpClient {

/// \brief Sends a DELETE request.
Result<HttpResponse> Delete(const std::string& path,
const std::unordered_map<std::string, std::string>& params,
const std::unordered_map<std::string, std::string>& headers,
const ErrorHandler& error_handler);

private:
void PrepareSession(const std::string& path,
const std::unordered_map<std::string, std::string>& request_headers,
const std::unordered_map<std::string, std::string>& params = {});
const std::unordered_map<std::string, std::string>& params,
const std::unordered_map<std::string, std::string>& headers);

std::unordered_map<std::string, std::string> default_headers_;

Expand Down
129 changes: 82 additions & 47 deletions src/iceberg/catalog/rest/rest_catalog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,33 @@ Result<CatalogConfig> FetchServerConfig(const ResourcePaths& paths,
return CatalogConfigFromJson(json);
}

#define ICEBERG_ENDPOINT_CHECK(endpoints, endpoint) \
do { \
if (!endpoints.contains(endpoint)) { \
return NotSupported("Not supported endpoint: {}", endpoint.ToString()); \
} \
} while (0)

Result<bool> CaptureNoSuchObject(const auto& status, ErrorKind kind) {
ICEBERG_DCHECK(kind == ErrorKind::kNoSuchTable || kind == ErrorKind::kNoSuchNamespace,
"Invalid kind for CaptureNoSuchObject");
if (status.has_value()) {
return true;
}
if (status.error().kind == kind) {
return false;
}
return std::unexpected(status.error());
}

Result<bool> CaptureNoSuchTable(const auto& status) {
return CaptureNoSuchObject(status, ErrorKind::kNoSuchTable);
}

Result<bool> CaptureNoSuchNamespace(const auto& status) {
return CaptureNoSuchObject(status, ErrorKind::kNoSuchNamespace);
}

} // namespace

RestCatalog::~RestCatalog() = default;
Expand Down Expand Up @@ -126,9 +153,7 @@ RestCatalog::RestCatalog(std::unique_ptr<RestCatalogProperties> config,
std::string_view RestCatalog::name() const { return name_; }

Result<std::vector<Namespace>> RestCatalog::ListNamespaces(const Namespace& ns) const {
ICEBERG_RETURN_UNEXPECTED(
CheckEndpoint(supported_endpoints_, Endpoint::ListNamespaces()));

ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::ListNamespaces());
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Namespaces());
std::vector<Namespace> result;
std::string next_token;
Expand Down Expand Up @@ -157,9 +182,7 @@ Result<std::vector<Namespace>> RestCatalog::ListNamespaces(const Namespace& ns)

Status RestCatalog::CreateNamespace(
const Namespace& ns, const std::unordered_map<std::string, std::string>& properties) {
ICEBERG_RETURN_UNEXPECTED(
CheckEndpoint(supported_endpoints_, Endpoint::CreateNamespace()));

ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::CreateNamespace());
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Namespaces());
CreateNamespaceRequest request{.namespace_ = ns, .properties = properties};
ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(ToJson(request)));
Expand All @@ -173,9 +196,7 @@ Status RestCatalog::CreateNamespace(

Result<std::unordered_map<std::string, std::string>> RestCatalog::GetNamespaceProperties(
const Namespace& ns) const {
ICEBERG_RETURN_UNEXPECTED(
CheckEndpoint(supported_endpoints_, Endpoint::GetNamespaceProperties()));

ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::GetNamespaceProperties());
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Namespace_(ns));
ICEBERG_ASSIGN_OR_RAISE(const auto response,
client_->Get(path, /*params=*/{}, /*headers=*/{},
Expand All @@ -186,48 +207,29 @@ Result<std::unordered_map<std::string, std::string>> RestCatalog::GetNamespacePr
}

Status RestCatalog::DropNamespace(const Namespace& ns) {
ICEBERG_RETURN_UNEXPECTED(
CheckEndpoint(supported_endpoints_, Endpoint::DropNamespace()));
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::DropNamespace());
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Namespace_(ns));
ICEBERG_ASSIGN_OR_RAISE(
const auto response,
client_->Delete(path, /*headers=*/{}, *DropNamespaceErrorHandler::Instance()));
ICEBERG_ASSIGN_OR_RAISE(const auto response,
client_->Delete(path, /*params=*/{}, /*headers=*/{},
*DropNamespaceErrorHandler::Instance()));
return {};
}

Result<bool> RestCatalog::NamespaceExists(const Namespace& ns) const {
auto check = CheckEndpoint(supported_endpoints_, Endpoint::NamespaceExists());
if (!check.has_value()) {
if (!supported_endpoints_.contains(Endpoint::NamespaceExists())) {
// Fall back to GetNamespaceProperties
auto result = GetNamespaceProperties(ns);
if (!result.has_value() && result.error().kind == ErrorKind::kNoSuchNamespace) {
return false;
}
ICEBERG_RETURN_UNEXPECTED(result);
// GET succeeded, namespace exists
return true;
return CaptureNoSuchNamespace(GetNamespaceProperties(ns));
}

ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Namespace_(ns));
auto response_or_error =
client_->Head(path, /*headers=*/{}, *NamespaceErrorHandler::Instance());
if (!response_or_error.has_value()) {
const auto& error = response_or_error.error();
// catch NoSuchNamespaceException/404 and return false
if (error.kind == ErrorKind::kNoSuchNamespace) {
return false;
}
ICEBERG_RETURN_UNEXPECTED(response_or_error);
}
return true;
return CaptureNoSuchNamespace(
client_->Head(path, /*headers=*/{}, *NamespaceErrorHandler::Instance()));
}

Status RestCatalog::UpdateNamespaceProperties(
const Namespace& ns, const std::unordered_map<std::string, std::string>& updates,
const std::unordered_set<std::string>& removals) {
ICEBERG_RETURN_UNEXPECTED(
CheckEndpoint(supported_endpoints_, Endpoint::UpdateNamespace()));

ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::UpdateNamespace());
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->NamespaceProperties(ns));
UpdateNamespacePropertiesRequest request{
.removals = std::vector<std::string>(removals.begin(), removals.end()),
Expand All @@ -252,7 +254,7 @@ Result<std::shared_ptr<Table>> RestCatalog::CreateTable(
const std::shared_ptr<PartitionSpec>& spec, const std::shared_ptr<SortOrder>& order,
const std::string& location,
const std::unordered_map<std::string, std::string>& properties) {
ICEBERG_RETURN_UNEXPECTED(CheckEndpoint(supported_endpoints_, Endpoint::CreateTable()));
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::CreateTable());
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Tables(identifier.ns));

CreateTableRequest request{
Expand Down Expand Up @@ -294,24 +296,57 @@ Result<std::shared_ptr<Transaction>> RestCatalog::StageCreateTable(
return NotImplemented("Not implemented");
}

Status RestCatalog::DropTable([[maybe_unused]] const TableIdentifier& identifier,
[[maybe_unused]] bool purge) {
return NotImplemented("Not implemented");
Status RestCatalog::DropTable(const TableIdentifier& identifier, bool purge) {
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::DeleteTable());
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Table(identifier));

std::unordered_map<std::string, std::string> params;
if (purge) {
params["purgeRequested"] = "true";
}
ICEBERG_ASSIGN_OR_RAISE(
const auto response,
client_->Delete(path, params, /*headers=*/{}, *TableErrorHandler::Instance()));
return {};
}

Result<bool> RestCatalog::TableExists(
[[maybe_unused]] const TableIdentifier& identifier) const {
return NotImplemented("Not implemented");
Result<bool> RestCatalog::TableExists(const TableIdentifier& identifier) const {
if (!supported_endpoints_.contains(Endpoint::TableExists())) {
// Fall back to call LoadTable
return CaptureNoSuchTable(LoadTableInternal(identifier));
}

ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Table(identifier));
return CaptureNoSuchTable(
client_->Head(path, /*headers=*/{}, *TableErrorHandler::Instance()));
}

Status RestCatalog::RenameTable([[maybe_unused]] const TableIdentifier& from,
[[maybe_unused]] const TableIdentifier& to) {
return NotImplemented("Not implemented");
}

Result<std::shared_ptr<Table>> RestCatalog::LoadTable(
[[maybe_unused]] const TableIdentifier& identifier) {
return NotImplemented("Not implemented");
Result<std::string> RestCatalog::LoadTableInternal(
const TableIdentifier& identifier) const {
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::LoadTable());
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Table(identifier));
ICEBERG_ASSIGN_OR_RAISE(
const auto response,
client_->Get(path, /*params=*/{}, /*headers=*/{}, *TableErrorHandler::Instance()));
return response.body();
}

Result<std::shared_ptr<Table>> RestCatalog::LoadTable(const TableIdentifier& identifier) {
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::LoadTable());
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Table(identifier));

ICEBERG_ASSIGN_OR_RAISE(const auto body, LoadTableInternal(identifier));
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(body));
ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json));

return Table::Make(identifier, std::move(load_result.metadata),
std::move(load_result.metadata_location), file_io_,
shared_from_this());
}

Result<std::shared_ptr<Table>> RestCatalog::RegisterTable(
Expand Down
2 changes: 2 additions & 0 deletions src/iceberg/catalog/rest/rest_catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog,
std::shared_ptr<FileIO> file_io, std::unique_ptr<ResourcePaths> paths,
std::unordered_set<Endpoint> endpoints);

Result<std::string> LoadTableInternal(const TableIdentifier& identifier) const;

std::unique_ptr<RestCatalogProperties> config_;
std::shared_ptr<FileIO> file_io_;
std::unique_ptr<HttpClient> client_;
Expand Down
8 changes: 0 additions & 8 deletions src/iceberg/catalog/rest/rest_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -253,12 +253,4 @@ std::string GetStandardReasonPhrase(int32_t status_code) {
}
}

Status CheckEndpoint(const std::unordered_set<Endpoint>& supported_endpoints,
const Endpoint& endpoint) {
if (!supported_endpoints.contains(endpoint)) {
return NotSupported("Server does not support endpoint: {}", endpoint.ToString());
}
return {};
}

} // namespace iceberg::rest
8 changes: 0 additions & 8 deletions src/iceberg/catalog/rest/rest_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,4 @@ ICEBERG_REST_EXPORT std::unordered_map<std::string, std::string> MergeConfigs(
/// Error").
ICEBERG_REST_EXPORT std::string GetStandardReasonPhrase(int32_t status_code);

/// \brief Check whether the given endpoint is in the set of supported endpoints.
///
/// \param supported_endpoints Set of endpoints advertised by the server
/// \param endpoint Endpoint to validate
/// \return Status::OK if supported, NotSupported error otherwise
ICEBERG_REST_EXPORT Status CheckEndpoint(
const std::unordered_set<Endpoint>& supported_endpoints, const Endpoint& endpoint);

} // namespace iceberg::rest
Loading
Loading