Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions src/iceberg/test/fast_append_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,20 @@
#include "iceberg/update/fast_append.h"

#include <format>
#include <optional>
#include <string>
#include <vector>

#include <gmock/gmock.h>
#include <gtest/gtest.h>

#include "iceberg/avro/avro_register.h"
#include "iceberg/constants.h"
#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/manifest/manifest_writer.h"
#include "iceberg/partition_spec.h"
#include "iceberg/schema.h"
#include "iceberg/snapshot.h"
#include "iceberg/table_metadata.h"
#include "iceberg/test/matchers.h"
#include "iceberg/test/test_resource.h"
Expand Down Expand Up @@ -72,6 +79,23 @@ class FastAppendTest : public UpdateTestBase {
return data_file;
}

Result<ManifestFile> WriteManifest(
const std::string& path, const std::vector<std::shared_ptr<DataFile>>& files) {
ICEBERG_ASSIGN_OR_RAISE(
auto writer, ManifestWriter::MakeWriter(table_->metadata()->format_version,
kInvalidSnapshotId, path, file_io_, spec_,
schema_, ManifestContent::kData));
for (const auto& file : files) {
ManifestEntry entry;
entry.status = ManifestStatus::kAdded;
entry.snapshot_id = std::nullopt;
entry.data_file = file;
ICEBERG_RETURN_UNEXPECTED(writer->WriteAddedEntry(entry));
}
ICEBERG_RETURN_UNEXPECTED(writer->Close());
return writer->ToManifestFile();
}

std::shared_ptr<PartitionSpec> spec_;
std::shared_ptr<Schema> schema_;
std::shared_ptr<DataFile> file_a_;
Expand All @@ -90,6 +114,9 @@ TEST_F(FastAppendTest, AppendDataFile) {
EXPECT_EQ(snapshot->summary.at("added-data-files"), "1");
EXPECT_EQ(snapshot->summary.at("added-records"), "100");
EXPECT_EQ(snapshot->summary.at("added-files-size"), "1024");
EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kManifestsCreated), "1");
EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kManifestsKept), "0");
EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kManifestsReplaced), "0");
}

TEST_F(FastAppendTest, AppendMultipleDataFiles) {
Expand Down Expand Up @@ -172,6 +199,40 @@ TEST_F(FastAppendTest, FinalizeIgnoresCleanupDeleteFailure) {
IsOk());
}

TEST_F(FastAppendTest, RetryCopiesAppendManifestAgain) {
table_->metadata()->format_version = 1;
const auto path = table_location_ + "/metadata/input.avro";
ICEBERG_UNWRAP_OR_FAIL(auto manifest, WriteManifest(path, {file_a_}));

std::shared_ptr<FastAppend> fast_append;
ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend());
std::vector<std::string> deleted_paths;
fast_append->DeleteWith([&](const std::string& deleted_path) {
deleted_paths.push_back(deleted_path);
return file_io_->DeleteFile(deleted_path);
});
fast_append->AppendManifest(manifest);

auto& update = static_cast<SnapshotUpdate&>(*fast_append);
// First Apply() copies the input manifest because v1 cannot inherit snapshot IDs.
ICEBERG_UNWRAP_OR_FAIL(auto first_apply, update.Apply());
SnapshotCache first_cache(first_apply.snapshot.get());
ICEBERG_UNWRAP_OR_FAIL(auto first_manifests, first_cache.Manifests(file_io_));
ASSERT_EQ(first_manifests.size(), 1U);
const auto first_rewritten_path = first_manifests[0].manifest_path;
EXPECT_NE(first_rewritten_path, path);

// Second Apply() simulates retry cleanup, then copies the original manifest again.
ICEBERG_UNWRAP_OR_FAIL(auto second_apply, update.Apply());
EXPECT_THAT(deleted_paths, testing::Contains(first_rewritten_path));

SnapshotCache second_cache(second_apply.snapshot.get());
ICEBERG_UNWRAP_OR_FAIL(auto second_manifests, second_cache.Manifests(file_io_));
ASSERT_EQ(second_manifests.size(), 1U);
EXPECT_NE(second_manifests[0].manifest_path, path);
EXPECT_NE(second_manifests[0].manifest_path, first_rewritten_path);
}

TEST_F(FastAppendTest, AppendDuplicateFile) {
std::shared_ptr<FastAppend> fast_append;
ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend());
Expand Down
51 changes: 41 additions & 10 deletions src/iceberg/update/fast_append.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/manifest/manifest_util_internal.h"
#include "iceberg/snapshot.h"
#include "iceberg/table.h"
#include "iceberg/table.h" // IWYU pragma: keep
#include "iceberg/table_metadata.h"
#include "iceberg/table_properties.h"
#include "iceberg/transaction.h"
Expand Down Expand Up @@ -58,7 +58,7 @@ FastAppend& FastAppend::AppendFile(const std::shared_ptr<DataFile>& file) {
auto [iter, inserted] = data_files.insert(file);
if (inserted) {
has_new_files_ = true;
ICEBERG_BUILDER_RETURN_IF_ERROR(summary_.AddedFile(*spec, *file));
ICEBERG_BUILDER_RETURN_IF_ERROR(added_data_files_summary_.AddedFile(*spec, *file));
}

return *this;
Expand All @@ -75,11 +75,13 @@ FastAppend& FastAppend::AppendManifest(const ManifestFile& manifest) {
"Sequence number must be assigned during commit");

if (can_inherit_snapshot_id() && manifest.added_snapshot_id == kInvalidSnapshotId) {
summary_.AddedManifest(manifest);
appended_manifests_summary_.AddedManifest(manifest);
append_manifests_.push_back(manifest);
} else {
// The manifest must be rewritten with this update's snapshot ID
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto copied_manifest, CopyManifest(manifest));
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto copied_manifest,
CopyManifest(manifest, /*update_summary=*/true));
append_manifests_to_copy_.push_back(manifest);
rewritten_append_manifests_.push_back(std::move(copied_manifest));
}

Expand All @@ -93,6 +95,16 @@ Result<std::vector<ManifestFile>> FastAppend::Apply(
std::vector<ManifestFile> manifests;

ICEBERG_ASSIGN_OR_RAISE(auto new_written_manifests, WriteNewManifests());
// A retry cleanup deletes copied append manifests and clears the rewritten
// list; rebuild them from the original appended manifests before re-applying.
if (rewritten_append_manifests_.empty() && !append_manifests_to_copy_.empty()) {
for (const auto& manifest : append_manifests_to_copy_) {
ICEBERG_ASSIGN_OR_RAISE(auto copied_manifest,
CopyManifest(manifest, /*update_summary=*/false));
rewritten_append_manifests_.push_back(std::move(copied_manifest));
}
}

manifests.reserve(new_written_manifests.size() + append_manifests_.size() +
rewritten_append_manifests_.size());
if (!new_written_manifests.empty()) {
Expand Down Expand Up @@ -122,15 +134,31 @@ Result<std::vector<ManifestFile>> FastAppend::Apply(
snapshot_manifests.end());
}

manifest_count_summary_ =
BuildManifestCountSummary(manifests, /*replaced_manifests_count=*/0);

return manifests;
}

std::unordered_map<std::string, std::string> FastAppend::Summary() {
summary_.Clear();
summary_.SetPartitionSummaryLimit(
base().properties.Get(TableProperties::kWritePartitionSummaryLimit));
summary_.Merge(added_data_files_summary_);
summary_.Merge(appended_manifests_summary_);
for (const auto& [property, value] : custom_summary_properties_) {
summary_.Set(property, value);
}
summary_.Merge(manifest_count_summary_);
return summary_.Build();
}

void FastAppend::SetSummaryProperty(const std::string& property,
const std::string& value) {
custom_summary_properties_[property] = value;
SnapshotUpdate::SetSummaryProperty(property, value);
}

Status FastAppend::CleanUncommitted(const std::unordered_set<std::string>& committed) {
// Clean up new manifests that were written but not committed
if (!new_manifests_.empty()) {
Expand All @@ -151,24 +179,26 @@ Status FastAppend::CleanUncommitted(const std::unordered_set<std::string>& commi
std::ignore = DeleteFile(manifest.manifest_path);
}
}
rewritten_append_manifests_.clear();
}
return {};
}

bool FastAppend::CleanupAfterCommit() const {
// Cleanup after committing is disabled for FastAppend unless there are
// rewritten_append_manifests_ because:
// 1.) Appended manifests are never rewritten
// Cleanup after committing is disabled for FastAppend unless append manifests
// were copied or need to be copied on retry because:
// 1.) Directly appended manifests are never rewritten
// 2.) Manifests which are written out as part of AppendFile are already cleaned
// up between commit attempts in WriteNewManifests
return !rewritten_append_manifests_.empty();
return !rewritten_append_manifests_.empty() || !append_manifests_to_copy_.empty();
}

Result<std::shared_ptr<PartitionSpec>> FastAppend::Spec(int32_t spec_id) {
return base().PartitionSpecById(spec_id);
}

Result<ManifestFile> FastAppend::CopyManifest(const ManifestFile& manifest) {
Result<ManifestFile> FastAppend::CopyManifest(const ManifestFile& manifest,
bool update_summary) {
const TableMetadata& current = base();
ICEBERG_ASSIGN_OR_RAISE(auto schema, current.Schema());
ICEBERG_ASSIGN_OR_RAISE(auto spec,
Expand All @@ -180,7 +210,8 @@ Result<ManifestFile> FastAppend::CopyManifest(const ManifestFile& manifest) {

// Copy the manifest with the new snapshot ID.
return CopyAppendManifest(manifest, ctx_->table->io(), schema, spec, snapshot_id,
new_manifest_path, current.format_version, &summary_);
new_manifest_path, current.format_version,
update_summary ? &appended_manifests_summary_ : nullptr);
}

Result<std::vector<ManifestFile>> FastAppend::WriteNewManifests() {
Expand Down
13 changes: 12 additions & 1 deletion src/iceberg/update/fast_append.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class ICEBERG_EXPORT FastAppend : public SnapshotUpdate {
const TableMetadata& metadata_to_update,
const std::shared_ptr<Snapshot>& snapshot) override;
std::unordered_map<std::string, std::string> Summary() override;
void SetSummaryProperty(const std::string& property, const std::string& value) override;
Status CleanUncommitted(const std::unordered_set<std::string>& committed) override;
bool CleanupAfterCommit() const override;

Expand All @@ -84,8 +85,9 @@ class ICEBERG_EXPORT FastAppend : public SnapshotUpdate {
/// \brief Copy a manifest file with a new snapshot ID.
///
/// \param manifest The manifest to copy
Comment thread
wgtmac marked this conversation as resolved.
/// \param update_summary Whether to add copied entries to the append summary
/// \return The copied manifest file
Result<ManifestFile> CopyManifest(const ManifestFile& manifest);
Result<ManifestFile> CopyManifest(const ManifestFile& manifest, bool update_summary);
Comment thread
wgtmac marked this conversation as resolved.

/// \brief Write new manifests for the accumulated data files.
///
Expand All @@ -95,9 +97,18 @@ class ICEBERG_EXPORT FastAppend : public SnapshotUpdate {
private:
std::string table_name_;
std::unordered_map<int32_t, DataFileSet> new_data_files_by_spec_;
// Stable input summaries for retry-safe summary_ rebuilds.
SnapshotSummaryBuilder added_data_files_summary_;
SnapshotSummaryBuilder appended_manifests_summary_;
// User-provided summary properties restored after summary_ rebuilds.
std::unordered_map<std::string, std::string> custom_summary_properties_;
std::vector<ManifestFile> append_manifests_;
// Original manifests kept to recreate copied manifests after retry cleanup.
std::vector<ManifestFile> append_manifests_to_copy_;
std::vector<ManifestFile> rewritten_append_manifests_;
std::vector<ManifestFile> new_manifests_;
// Manifest count summary from the latest Apply() result.
SnapshotSummaryBuilder manifest_count_summary_;
bool has_new_files_{false};
};

Expand Down
17 changes: 1 addition & 16 deletions src/iceberg/update/merging_snapshot_update.cc
Original file line number Diff line number Diff line change
Expand Up @@ -944,26 +944,11 @@ Result<std::vector<ManifestFile>> MergingSnapshotUpdate::Apply(
result.insert(result.end(), std::make_move_iterator(merged_deletes.begin()),
std::make_move_iterator(merged_deletes.end()));

// Manifest count summary: unassigned manifests count as neither created nor kept.
int32_t manifests_created = 0;
int32_t manifests_kept = 0;
for (const auto& m : result) {
if (m.added_snapshot_id == snapshot_id) {
++manifests_created;
} else if (m.added_snapshot_id != kInvalidSnapshotId) {
++manifests_kept;
}
}
int32_t replaced_manifests_count = data_filter_manager_->ReplacedManifestsCount() +
delete_filter_manager_->ReplacedManifestsCount() +
data_merge_manager_->ReplacedManifestsCount() +
delete_merge_manager_->ReplacedManifestsCount();
summary_builder().Set(SnapshotSummaryFields::kManifestsCreated,
std::to_string(manifests_created));
summary_builder().Set(SnapshotSummaryFields::kManifestsKept,
std::to_string(manifests_kept));
summary_builder().Set(SnapshotSummaryFields::kManifestsReplaced,
std::to_string(replaced_manifests_count));
summary_builder().Merge(BuildManifestCountSummary(result, replaced_manifests_count));

return result;
}
Expand Down
22 changes: 22 additions & 0 deletions src/iceberg/update/snapshot_update.cc
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,28 @@ std::string SnapshotUpdate::ManifestListPath() {
return ctx_->MetadataFileLocation(filename);
}

SnapshotSummaryBuilder SnapshotUpdate::BuildManifestCountSummary(
std::span<const ManifestFile> manifests, int32_t replaced_manifests_count) {
SnapshotSummaryBuilder summary;
int32_t manifests_created = 0;
int32_t manifests_kept = 0;
int64_t snapshot_id = SnapshotId();
for (const auto& manifest : manifests) {
if (manifest.added_snapshot_id == snapshot_id) {
++manifests_created;
} else if (manifest.added_snapshot_id != kInvalidSnapshotId) {
++manifests_kept;
}
}

summary.Set(SnapshotSummaryFields::kManifestsCreated,
std::to_string(manifests_created));
summary.Set(SnapshotSummaryFields::kManifestsKept, std::to_string(manifests_kept));
summary.Set(SnapshotSummaryFields::kManifestsReplaced,
std::to_string(replaced_manifests_count));
return summary;
}

std::string SnapshotUpdate::ManifestPath() {
// Generate manifest path
// Format: {metadata_location}/{uuid}-m{manifest_count}.avro
Expand Down
2 changes: 2 additions & 0 deletions src/iceberg/update/snapshot_update.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate {
std::string ManifestPath();
std::string ManifestListPath();
SnapshotSummaryBuilder& summary_builder() { return summary_; }
SnapshotSummaryBuilder BuildManifestCountSummary(
std::span<const ManifestFile> manifests, int32_t replaced_manifests_count);

private:
/// \brief Returns the snapshot summary from the implementation and updates totals.
Expand Down
Loading