Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions src/iceberg/catalog/rest/json_serde.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ constexpr std::string_view kSource = "source";
constexpr std::string_view kDestination = "destination";
constexpr std::string_view kMetadata = "metadata";
constexpr std::string_view kConfig = "config";
constexpr std::string_view kStorageCredentials = "storage-credentials";
constexpr std::string_view kPrefix = "prefix";
constexpr std::string_view kIdentifiers = "identifiers";
constexpr std::string_view kOverrides = "overrides";
constexpr std::string_view kDefaults = "defaults";
Expand Down Expand Up @@ -689,12 +691,32 @@ Result<RenameTableRequest> RenameTableRequestFromJson(const nlohmann::json& json
return request;
}

// StorageCredential (used by LoadTableResult)
nlohmann::json ToJson(const StorageCredential& credential) {
nlohmann::json json;
json[kPrefix] = credential.prefix;
SetContainerField(json, kConfig, credential.config);
return json;
}

Result<StorageCredential> StorageCredentialFromJson(const nlohmann::json& json) {
StorageCredential credential;
ICEBERG_ASSIGN_OR_RAISE(credential.prefix, GetJsonValue<std::string>(json, kPrefix));
ICEBERG_ASSIGN_OR_RAISE(
credential.config,
GetJsonValueOrDefault<decltype(credential.config)>(json, kConfig));
return credential;
}

// LoadTableResult (used by CreateTableResponse, LoadTableResponse)
nlohmann::json ToJson(const LoadTableResult& result) {
nlohmann::json json;
SetOptionalStringField(json, kMetadataLocation, result.metadata_location);
json[kMetadata] = ToJson(*result.metadata);
SetContainerField(json, kConfig, result.config);
for (const auto& credential : result.storage_credentials) {
json[kStorageCredentials].emplace_back(ToJson(credential));
}
return json;
}

Expand All @@ -707,6 +729,9 @@ Result<LoadTableResult> LoadTableResultFromJson(const nlohmann::json& json) {
ICEBERG_ASSIGN_OR_RAISE(result.metadata, TableMetadataFromJson(metadata_json));
ICEBERG_ASSIGN_OR_RAISE(result.config,
GetJsonValueOrDefault<decltype(result.config)>(json, kConfig));
ICEBERG_ASSIGN_OR_RAISE(result.storage_credentials,
FromJsonList<StorageCredential>(json, kStorageCredentials,
StorageCredentialFromJson));
ICEBERG_RETURN_UNEXPECTED(result.Validate());
return result;
}
Expand Down
24 changes: 18 additions & 6 deletions src/iceberg/catalog/rest/rest_catalog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -366,8 +366,12 @@ Result<std::shared_ptr<Table>> RestCatalog::CreateTable(
ICEBERG_ASSIGN_OR_RAISE(auto result,
CreateTableInternal(identifier, schema, spec, order, location,
properties, /*stage_create=*/false));
ICEBERG_ASSIGN_OR_RAISE(auto file_io,
MakeTableFileIO(config_, file_io_, result.metadata->location,
result.config, result.storage_credentials));
return Table::Make(identifier, std::move(result.metadata),
std::move(result.metadata_location), file_io_, shared_from_this());
std::move(result.metadata_location), std::move(file_io),
shared_from_this());
}

Result<std::shared_ptr<Table>> RestCatalog::UpdateTable(
Expand Down Expand Up @@ -409,10 +413,13 @@ Result<std::shared_ptr<Transaction>> RestCatalog::StageCreateTable(
ICEBERG_ASSIGN_OR_RAISE(auto result,
CreateTableInternal(identifier, schema, spec, order, location,
properties, /*stage_create=*/true));
ICEBERG_ASSIGN_OR_RAISE(auto file_io,
MakeTableFileIO(config_, file_io_, result.metadata->location,
result.config, result.storage_credentials));
ICEBERG_ASSIGN_OR_RAISE(auto staged_table,
StagedTable::Make(identifier, std::move(result.metadata),
std::move(result.metadata_location), file_io_,
shared_from_this()));
std::move(result.metadata_location),
std::move(file_io), shared_from_this()));
return Transaction::Make(std::move(staged_table), TransactionKind::kCreate);
}

Expand Down Expand Up @@ -479,9 +486,11 @@ Result<std::shared_ptr<Table>> RestCatalog::LoadTable(const TableIdentifier& ide
ICEBERG_ASSIGN_OR_RAISE(const auto body, LoadTableInternal(identifier));
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(body));
ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json));
/// FIXME: support per-table FileIO creation
ICEBERG_ASSIGN_OR_RAISE(
auto file_io, MakeTableFileIO(config_, file_io_, load_result.metadata->location,
load_result.config, load_result.storage_credentials));
return Table::Make(identifier, std::move(load_result.metadata),
std::move(load_result.metadata_location), file_io_,
std::move(load_result.metadata_location), std::move(file_io),
shared_from_this());
}

Expand All @@ -503,8 +512,11 @@ Result<std::shared_ptr<Table>> RestCatalog::RegisterTable(

ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json));
ICEBERG_ASSIGN_OR_RAISE(
auto file_io, MakeTableFileIO(config_, file_io_, load_result.metadata->location,
load_result.config, load_result.storage_credentials));
return Table::Make(identifier, std::move(load_result.metadata),
std::move(load_result.metadata_location), file_io_,
std::move(load_result.metadata_location), std::move(file_io),
shared_from_this());
}

Expand Down
60 changes: 60 additions & 0 deletions src/iceberg/catalog/rest/rest_file_io.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,13 @@

#include "iceberg/catalog/rest/rest_file_io.h"

#include <memory>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>

#include "iceberg/catalog/rest/rest_util.h"
#include "iceberg/file_io_registry.h"
#include "iceberg/util/macros.h"

Expand Down Expand Up @@ -92,4 +97,59 @@ Result<std::unique_ptr<FileIO>> MakeCatalogFileIO(const RestCatalogProperties& c
return FileIORegistry::Load(io_impl, config.configs());
}

const StorageCredential* MatchStorageCredential(
std::string_view location, const std::vector<StorageCredential>& credentials) {
const StorageCredential* best = nullptr;
for (const auto& credential : credentials) {
if (!location.starts_with(credential.prefix)) {
continue;
}
if (best == nullptr || credential.prefix.size() > best->prefix.size()) {
best = &credential;
}
}
return best;
}

Result<std::shared_ptr<FileIO>> MakeTableFileIO(
const RestCatalogProperties& catalog_config,
const std::shared_ptr<FileIO>& catalog_file_io, std::string_view location,
const std::unordered_map<std::string, std::string>& table_config,
const std::vector<StorageCredential>& storage_credentials) {
const StorageCredential* credential =
MatchStorageCredential(location, storage_credentials);

// Without table-specific overrides, reuse the shared catalog FileIO.
if (table_config.empty() && credential == nullptr) {
return catalog_file_io;
}

// Layer table config, then vended credentials, on top of the catalog properties.
// Vended credentials are the most specific and therefore take precedence.
static const std::unordered_map<std::string, std::string> kEmptyConfig;
auto properties =
MergeConfigs(catalog_config.configs(), table_config,
credential != nullptr ? credential->config : kEmptyConfig);

std::string io_impl;
if (auto it = properties.find(RestCatalogProperties::kIOImpl.key());
it != properties.end()) {
io_impl = it->second;
}
if (io_impl.empty()) {
ICEBERG_ASSIGN_OR_RAISE(const auto detected_kind, DetectBuiltinFileIO(location));
io_impl = std::string(BuiltinFileIOName(detected_kind));
} else if (!location.empty() && IsBuiltinImpl(io_impl)) {
ICEBERG_ASSIGN_OR_RAISE(const auto detected_kind, DetectBuiltinFileIO(location));
if (io_impl != BuiltinFileIOName(detected_kind)) {
return InvalidArgument(
R"("io-impl" value '{}' is incompatible with table location '{}')", io_impl,
location);
}
}

ICEBERG_ASSIGN_OR_RAISE(auto file_io, FileIORegistry::Load(io_impl, properties));
return std::shared_ptr<FileIO>(std::move(file_io));
}

} // namespace iceberg::rest
26 changes: 26 additions & 0 deletions src/iceberg/catalog/rest/rest_file_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@
#include <cstdint>
#include <memory>
#include <string_view>
#include <unordered_map>
#include <vector>

#include "iceberg/catalog/rest/catalog_properties.h"
#include "iceberg/catalog/rest/iceberg_rest_export.h"
#include "iceberg/catalog/rest/types.h"
#include "iceberg/file_io.h"
#include "iceberg/file_io_registry.h"
#include "iceberg/result.h"
Expand All @@ -44,4 +47,27 @@ ICEBERG_REST_EXPORT std::string_view BuiltinFileIOName(BuiltinFileIOKind kind);
ICEBERG_REST_EXPORT Result<std::unique_ptr<FileIO>> MakeCatalogFileIO(
const RestCatalogProperties& config);

/// \brief Select the storage credential whose prefix is the most specific (longest)
/// match for `location`, per the REST spec guidance.
///
/// \return A pointer into `credentials`, or nullptr if none match. The pointer is
/// valid only as long as `credentials` is alive and unmodified.
ICEBERG_REST_EXPORT const StorageCredential* MatchStorageCredential(
std::string_view location, const std::vector<StorageCredential>& credentials);

/// \brief Build a FileIO for a single table from a load/create response.
///
/// Layers the table's vended storage credentials (the most specific prefix match for
/// `location`) and table-specific `table_config` on top of the catalog configuration,
/// then resolves a FileIO implementation. Vended credentials take precedence over
/// `table_config`, which takes precedence over the catalog configuration.
///
/// When the table supplies neither vended credentials nor overriding config,
/// `catalog_file_io` is returned unchanged so the shared instance is reused.
ICEBERG_REST_EXPORT Result<std::shared_ptr<FileIO>> MakeTableFileIO(
const RestCatalogProperties& catalog_config,
const std::shared_ptr<FileIO>& catalog_file_io, std::string_view location,
const std::unordered_map<std::string, std::string>& table_config,
const std::vector<StorageCredential>& storage_credentials);

} // namespace iceberg::rest
3 changes: 2 additions & 1 deletion src/iceberg/catalog/rest/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ bool CreateTableRequest::operator==(const CreateTableRequest& other) const {
}

bool LoadTableResult::operator==(const LoadTableResult& other) const {
if (metadata_location != other.metadata_location || config != other.config) {
if (metadata_location != other.metadata_location || config != other.config ||
storage_credentials != other.storage_credentials) {
return false;
}

Expand Down
19 changes: 18 additions & 1 deletion src/iceberg/catalog/rest/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,29 @@ struct ICEBERG_REST_EXPORT CreateTableRequest {
/// \brief An opaque token that allows clients to make use of pagination for list APIs.
using PageToken = std::string;

/// \brief A storage credential vended by the REST catalog, scoped to a location prefix.
///
/// The REST catalog returns these so clients can access table data without holding
/// long-lived storage credentials. When several credentials are available, clients
/// should pick the one with the most specific (longest) matching prefix.
struct ICEBERG_REST_EXPORT StorageCredential {
/// Storage location prefix this credential applies to (e.g. "s3://bucket/db/table").
std::string prefix; // required
/// Credential properties to layer onto the FileIO configuration (e.g. access key,
/// secret key, session token).
std::unordered_map<std::string, std::string> config;

bool operator==(const StorageCredential&) const = default;
};

/// \brief Result body for table create/load/register APIs.
struct ICEBERG_REST_EXPORT LoadTableResult {
std::string metadata_location;
std::shared_ptr<TableMetadata> metadata; // required
std::unordered_map<std::string, std::string> config;
// TODO(Li Feiyang): Add std::shared_ptr<StorageCredential> storage_credential;
/// Storage credentials for accessing the table's data. Clients should prefer these
/// over any credentials embedded in `config`.
std::vector<StorageCredential> storage_credentials;

/// \brief Validates the LoadTableResult.
Status Validate() const {
Expand Down
70 changes: 70 additions & 0 deletions src/iceberg/test/rest_file_io_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,15 @@

#include "iceberg/catalog/rest/rest_file_io.h"

#include <memory>
#include <string>
#include <unordered_map>
#include <vector>

#include <gmock/gmock.h>
#include <gtest/gtest.h>

#include "iceberg/catalog/rest/types.h"
#include "iceberg/file_io_registry.h"
#include "iceberg/test/matchers.h"

Expand Down Expand Up @@ -143,4 +149,68 @@ TEST(RestFileIOTest, MakeCatalogFileIOSkipsCheckWhenWarehouseAbsent) {
ASSERT_THAT(result, IsOk());
}

TEST(RestFileIOTest, MatchStorageCredentialPicksLongestPrefix) {
std::vector<StorageCredential> credentials = {
{.prefix = "s3://bucket", .config = {{"k", "broad"}}},
{.prefix = "s3://bucket/db/table", .config = {{"k", "specific"}}},
{.prefix = "s3://other", .config = {{"k", "other"}}},
};

const auto* match =
MatchStorageCredential("s3://bucket/db/table/data/f.parquet", credentials);
ASSERT_NE(match, nullptr);
EXPECT_EQ(match->prefix, "s3://bucket/db/table");

EXPECT_EQ(MatchStorageCredential("gs://nope/x", credentials), nullptr);
}

TEST(RestFileIOTest, MatchStorageCredentialEmptyReturnsNull) {
EXPECT_EQ(MatchStorageCredential("s3://bucket/x", {}), nullptr);
}

TEST(RestFileIOTest, MakeTableFileIOReusesCatalogIOWhenNoOverrides) {
auto catalog_io = std::make_shared<MockFileIO>();
auto config = RestCatalogProperties::FromMap(
{{"io-impl", std::string(FileIORegistry::kArrowS3FileIO)}});

auto result = MakeTableFileIO(config, catalog_io, "s3://bucket/test",
/*table_config=*/{}, /*storage_credentials=*/{});
ASSERT_THAT(result, IsOk());
EXPECT_EQ(result.value(), catalog_io); // shared catalog instance reused
}

TEST(RestFileIOTest, MakeTableFileIOAppliesVendedCredentials) {
auto captured = std::make_shared<std::unordered_map<std::string, std::string>>();
FileIORegistry::Register(
std::string(FileIORegistry::kArrowS3FileIO),
[captured](const std::unordered_map<std::string, std::string>& properties)
-> Result<std::unique_ptr<FileIO>> {
*captured = properties;
return std::make_unique<MockFileIO>();
});

auto catalog_io = std::make_shared<MockFileIO>();
auto config = RestCatalogProperties::FromMap(
{{"io-impl", std::string(FileIORegistry::kArrowS3FileIO)},
{"s3.access-key-id", "catalog-key"}});

std::vector<StorageCredential> credentials = {
{.prefix = "s3://bucket", .config = {{"s3.access-key-id", "broad"}}},
{.prefix = "s3://bucket/test",
.config = {{"s3.access-key-id", "vended"}, {"s3.session-token", "tok"}}},
};

auto result = MakeTableFileIO(config, catalog_io, "s3://bucket/test/data/f.parquet",
/*table_config=*/{{"write.parquet.compression", "zstd"}},
credentials);
ASSERT_THAT(result, IsOk());
EXPECT_NE(result.value(), catalog_io); // a new, table-scoped FileIO

// The most specific vended credential wins over the catalog value.
EXPECT_EQ((*captured)["s3.access-key-id"], "vended");
EXPECT_EQ((*captured)["s3.session-token"], "tok");
// Table-specific config is layered in as well.
EXPECT_EQ((*captured)["write.parquet.compression"], "zstd");
}

} // namespace iceberg::rest
27 changes: 25 additions & 2 deletions src/iceberg/test/rest_json_serde_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1116,7 +1116,17 @@ INSTANTIATE_TEST_SUITE_P(
.model = {.metadata_location = "s3://bucket/metadata/v1.json",
.metadata = MakeSimpleTableMetadata(),
.config = {{"warehouse", "s3://bucket/warehouse"},
{"foo", "bar"}}}}),
{"foo", "bar"}}}},
// With vended storage credentials
LoadTableResultParam{
.test_name = "WithStorageCredentials",
.expected_json_str =
R"({"metadata":{"current-schema-id":1,"current-snapshot-id":null,"default-sort-order-id":0,"default-spec-id":0,"format-version":2,"last-column-id":1,"last-partition-id":0,"last-sequence-number":0,"last-updated-ms":0,"location":"s3://bucket/test","metadata-log":[],"partition-specs":[{"fields":[],"spec-id":0}],"partition-statistics":[],"properties":{},"refs":{},"schemas":[{"fields":[{"id":1,"name":"id","required":true,"type":"int"}],"schema-id":1,"type":"struct"}],"snapshot-log":[],"snapshots":[],"sort-orders":[{"fields":[],"order-id":0}],"statistics":[],"table-uuid":"test-uuid-1234"},"storage-credentials":[{"config":{"s3.access-key-id":"AKIA","s3.secret-access-key":"secret"},"prefix":"s3://bucket/test"}]})",
.model = {.metadata = MakeSimpleTableMetadata(),
.storage_credentials = {StorageCredential{
.prefix = "s3://bucket/test",
.config = {{"s3.access-key-id", "AKIA"},
{"s3.secret-access-key", "secret"}}}}}}),
[](const ::testing::TestParamInfo<LoadTableResultParam>& info) {
return info.param.test_name;
});
Expand Down Expand Up @@ -1145,7 +1155,20 @@ INSTANTIATE_TEST_SUITE_P(
.json_str =
R"({"metadata":{"format-version":2,"table-uuid":"test-uuid-1234","location":"s3://bucket/test","last-sequence-number":0,"last-updated-ms":0,"last-column-id":1,"schemas":[{"type":"struct","schema-id":1,"fields":[{"id":1,"name":"id","type":"int","required":true}]}],"current-schema-id":1,"partition-specs":[{"spec-id":0,"fields":[]}],"default-spec-id":0,"last-partition-id":0,"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"properties":{}},"config":{"warehouse":"s3://bucket/warehouse"}})",
.expected_model = {.metadata = MakeSimpleTableMetadata(),
.config = {{"warehouse", "s3://bucket/warehouse"}}}}),
.config = {{"warehouse", "s3://bucket/warehouse"}}}},
// With multiple vended storage credentials
LoadTableResultDeserializeParam{
.test_name = "WithStorageCredentials",
.json_str =
R"({"metadata":{"format-version":2,"table-uuid":"test-uuid-1234","location":"s3://bucket/test","last-sequence-number":0,"last-updated-ms":0,"last-column-id":1,"schemas":[{"type":"struct","schema-id":1,"fields":[{"id":1,"name":"id","type":"int","required":true}]}],"current-schema-id":1,"partition-specs":[{"spec-id":0,"fields":[]}],"default-spec-id":0,"last-partition-id":0,"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"properties":{}},"storage-credentials":[{"prefix":"s3://bucket","config":{"s3.access-key-id":"BROAD"}},{"prefix":"s3://bucket/test","config":{"s3.access-key-id":"AKIA","s3.session-token":"tok"}}]})",
.expected_model =
{.metadata = MakeSimpleTableMetadata(),
.storage_credentials =
{StorageCredential{.prefix = "s3://bucket",
.config = {{"s3.access-key-id", "BROAD"}}},
StorageCredential{.prefix = "s3://bucket/test",
.config = {{"s3.access-key-id", "AKIA"},
{"s3.session-token", "tok"}}}}}}),
[](const ::testing::TestParamInfo<LoadTableResultDeserializeParam>& info) {
return info.param.test_name;
});
Expand Down
Loading