From 015336cc5e032a1de86688c61e5303a7380c3809 Mon Sep 17 00:00:00 2001 From: rahulsmahadev Date: Tue, 9 Jun 2026 23:33:02 +0000 Subject: [PATCH 1/2] feat(parquet): support reading list columns as Arrow large_list Implements the remaining gaps from #502: - Accept LARGE_LIST in ValidateParquetSchemaEvolution wherever LIST is accepted, so schema projection works when the Arrow reader presents 64-bit offset list types. - Add a read.arrow.use-large-list reader property (default: false) that configures the Parquet reader to decode list columns as large_list and aligns the output Arrow schema accordingly. Closes #513 Signed-off-by: rahulsmahadev --- src/iceberg/file_reader.h | 3 + src/iceberg/parquet/parquet_reader.cc | 53 +++++++++++ src/iceberg/parquet/parquet_schema_util.cc | 3 +- src/iceberg/test/parquet_schema_test.cc | 46 ++++++++- src/iceberg/test/parquet_test.cc | 106 +++++++++++++++++++++ 5 files changed, 209 insertions(+), 2 deletions(-) diff --git a/src/iceberg/file_reader.h b/src/iceberg/file_reader.h index c76c10093..71262128b 100644 --- a/src/iceberg/file_reader.h +++ b/src/iceberg/file_reader.h @@ -75,6 +75,9 @@ class ICEBERG_EXPORT ReaderProperties : public ConfigBase { /// \brief The batch size to read. inline static Entry kBatchSize{"read.batch-size", 4096}; + /// \brief Read list columns as Arrow large_list (64-bit offsets) instead of list. + /// Default: false (use 32-bit offset list). + inline static Entry kArrowUseLargeList{"read.arrow.use-large-list", false}; /// \brief Skip GenericDatum in Avro reader for better performance. /// When true, decode directly from Avro to Arrow without GenericDatum intermediate. /// Default: true (skip GenericDatum for better performance). diff --git a/src/iceberg/parquet/parquet_reader.cc b/src/iceberg/parquet/parquet_reader.cc index 775644a94..8bfb802a8 100644 --- a/src/iceberg/parquet/parquet_reader.cc +++ b/src/iceberg/parquet/parquet_reader.cc @@ -41,6 +41,7 @@ #include "iceberg/result.h" #include "iceberg/schema_internal.h" #include "iceberg/schema_util.h" +#include "iceberg/util/checked_cast.h" #include "iceberg/util/macros.h" namespace iceberg::parquet { @@ -84,6 +85,41 @@ class EmptyRecordBatchReader : public ::arrow::RecordBatchReader { } }; +std::shared_ptr<::arrow::Field> UseLargeListField( + const std::shared_ptr<::arrow::Field>& field); + +// Rebuild a data type with all nested list types replaced by large_list. +std::shared_ptr<::arrow::DataType> UseLargeListType( + const std::shared_ptr<::arrow::DataType>& type) { + switch (type->id()) { + case ::arrow::Type::LIST: { + const auto& list_type = internal::checked_cast(*type); + return ::arrow::large_list(UseLargeListField(list_type.value_field())); + } + case ::arrow::Type::STRUCT: { + ::arrow::FieldVector fields; + fields.reserve(type->num_fields()); + for (const auto& field : type->fields()) { + fields.push_back(UseLargeListField(field)); + } + return ::arrow::struct_(std::move(fields)); + } + case ::arrow::Type::MAP: { + const auto& map_type = internal::checked_cast(*type); + return std::make_shared<::arrow::MapType>(UseLargeListField(map_type.key_field()), + UseLargeListField(map_type.item_field()), + map_type.keys_sorted()); + } + default: + return type; + } +} + +std::shared_ptr<::arrow::Field> UseLargeListField( + const std::shared_ptr<::arrow::Field>& field) { + return field->WithType(UseLargeListType(field->type())); +} + } // namespace // A stateful context to keep track of the reading progress. @@ -118,6 +154,10 @@ class ParquetReader::Impl { arrow_reader_properties.set_batch_size( options.properties.Get(ReaderProperties::kBatchSize)); arrow_reader_properties.set_arrow_extensions_enabled(true); + use_large_list_ = options.properties.Get(ReaderProperties::kArrowUseLargeList); + if (use_large_list_) { + arrow_reader_properties.set_list_type(::arrow::Type::LARGE_LIST); + } // Open the Parquet file reader ICEBERG_ASSIGN_OR_RAISE(input_stream_, OpenInputStream(options)); @@ -214,6 +254,17 @@ class ParquetReader::Impl { ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*read_schema_, &arrow_schema)); ICEBERG_ARROW_ASSIGN_OR_RETURN(context_->output_arrow_schema_, ::arrow::ImportSchema(&arrow_schema)); + if (use_large_list_) { + // Align the output schema with the large_list arrays produced by the + // Parquet reader when kArrowUseLargeList is enabled. + ::arrow::FieldVector fields; + fields.reserve(context_->output_arrow_schema_->fields().size()); + for (const auto& field : context_->output_arrow_schema_->fields()) { + fields.push_back(UseLargeListField(field)); + } + context_->output_arrow_schema_ = + ::arrow::schema(std::move(fields), context_->output_arrow_schema_->metadata()); + } // Row group pruning based on the split // TODO(gangwu): add row group filtering based on zone map, bloom filter, etc. @@ -255,6 +306,8 @@ class ParquetReader::Impl { ::arrow::MemoryPool* pool_ = ::arrow::default_memory_pool(); // The split to read from the Parquet file. std::optional split_; + // Whether to read list columns as large_list (64-bit offsets). + bool use_large_list_ = false; // Schema to read from the Parquet file. std::shared_ptr<::iceberg::Schema> read_schema_; // The projection result to apply to the read schema. diff --git a/src/iceberg/parquet/parquet_schema_util.cc b/src/iceberg/parquet/parquet_schema_util.cc index 39e321d9f..4394255ef 100644 --- a/src/iceberg/parquet/parquet_schema_util.cc +++ b/src/iceberg/parquet/parquet_schema_util.cc @@ -245,7 +245,8 @@ Status ValidateParquetSchemaEvolution( } break; case TypeId::kList: - if (arrow_type->id() == ::arrow::Type::LIST) { + if (arrow_type->id() == ::arrow::Type::LIST || + arrow_type->id() == ::arrow::Type::LARGE_LIST) { return {}; } break; diff --git a/src/iceberg/test/parquet_schema_test.cc b/src/iceberg/test/parquet_schema_test.cc index 75e99ff12..46f50e1e1 100644 --- a/src/iceberg/test/parquet_schema_test.cc +++ b/src/iceberg/test/parquet_schema_test.cc @@ -111,12 +111,14 @@ ::parquet::schema::NodePtr MakeMapNode(const std::string& name, // Helper to create SchemaManifest from Parquet schema ::parquet::arrow::SchemaManifest MakeSchemaManifest( - const ::parquet::schema::NodePtr& parquet_schema) { + const ::parquet::schema::NodePtr& parquet_schema, + ::arrow::Type::type list_type = ::arrow::Type::LIST) { auto parquet_schema_descriptor = std::make_shared<::parquet::SchemaDescriptor>(); parquet_schema_descriptor->Init(parquet_schema); auto properties = ::parquet::default_arrow_reader_properties(); properties.set_arrow_extensions_enabled(true); + properties.set_list_type(list_type); ::parquet::arrow::SchemaManifest manifest; auto status = ::parquet::arrow::SchemaManifest::Make(parquet_schema_descriptor.get(), @@ -340,6 +342,16 @@ TEST(ParquetSchemaProjectionTest, ValidateSchemaEvolutionAllowsNullPhysicalType) ASSERT_THAT(status, IsOk()); } +TEST(ParquetSchemaProjectionTest, ValidateSchemaEvolutionAllowsLargeList) { + ::parquet::arrow::SchemaField parquet_field; + parquet_field.field = ::arrow::field("numbers", ::arrow::large_list(::arrow::int32())); + + ListType expected_type( + SchemaField::MakeOptional(/*field_id=*/101, "element", iceberg::int32())); + auto status = ValidateParquetSchemaEvolution(expected_type, parquet_field); + ASSERT_THAT(status, IsOk()); +} + TEST(ParquetSchemaProjectionTest, ProjectNullPhysicalFieldsAsNull) { Schema expected_schema({ SchemaField::MakeOptional(/*field_id=*/1, "age", iceberg::int32()), @@ -624,6 +636,38 @@ TEST(ParquetSchemaProjectionTest, ProjectListType) { ASSERT_EQ(SelectedColumnIndices(projection), std::vector({0, 1})); } +TEST(ParquetSchemaProjectionTest, ProjectLargeListType) { + Schema expected_schema({ + SchemaField::MakeOptional( + /*field_id=*/2, "numbers", + std::make_shared(SchemaField::MakeOptional( + /*field_id=*/101, "element", iceberg::int32()))), + }); + + auto parquet_schema = MakeGroupNode( + "iceberg_schema", + { + MakeListNode("numbers", MakeInt32Node("element", /*field_id=*/101), + /*field_id=*/2), + }); + + auto schema_manifest = MakeSchemaManifest(parquet_schema, ::arrow::Type::LARGE_LIST); + ASSERT_EQ(schema_manifest.schema_fields[0].field->type()->id(), + ::arrow::Type::LARGE_LIST); + + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 1); + ASSERT_PROJECTED_FIELD(projection.fields[0], 0); + + ASSERT_EQ(projection.fields[0].children.size(), 1); + ASSERT_PROJECTED_FIELD(projection.fields[0].children[0], 0); + + ASSERT_EQ(SelectedColumnIndices(projection), std::vector({0})); +} + TEST(ParquetSchemaProjectionTest, ProjectMapType) { Schema expected_schema({ SchemaField::MakeOptional( diff --git a/src/iceberg/test/parquet_test.cc b/src/iceberg/test/parquet_test.cc index ee1cbc931..3c103a046 100644 --- a/src/iceberg/test/parquet_test.cc +++ b/src/iceberg/test/parquet_test.cc @@ -186,6 +186,32 @@ class ParquetReaderTest : public TempFileTestBase { .properties = std::move(writer_properties)})); } + void CreateListParquetFile() { + auto schema = std::make_shared(std::vector{ + SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeOptional(2, "numbers", + std::make_shared(SchemaField::MakeOptional( + /*field_id=*/101, "element", int32())))}); + + ArrowSchema arrow_c_schema; + ASSERT_THAT(ToArrowSchema(*schema, &arrow_c_schema), IsOk()); + auto arrow_schema = ::arrow::ImportType(&arrow_c_schema).ValueOrDie(); + + auto array = ::arrow::json::ArrayFromJSONString( + ::arrow::struct_(arrow_schema->fields()), + R"([[1, [1, 2]], [2, [3]], [3, null]])") + .ValueOrDie(); + + WriterProperties writer_properties; + writer_properties.Set(WriterProperties::kParquetCompression, + std::string("uncompressed")); + + ASSERT_TRUE(WriteArray(array, {.path = temp_parquet_file_, + .schema = schema, + .io = file_io_, + .properties = std::move(writer_properties)})); + } + void CreateSplitParquetFile() { const std::string kParquetFieldIdKey = "PARQUET:field_id"; auto arrow_schema = ::arrow::schema( @@ -339,6 +365,86 @@ TEST_F(ParquetReaderTest, ReadWithBatchSize) { ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader)); } +TEST_F(ParquetReaderTest, ReadListType) { + CreateListParquetFile(); + + auto schema = std::make_shared(std::vector{ + SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeOptional(2, "numbers", + std::make_shared(SchemaField::MakeOptional( + /*field_id=*/101, "element", int32())))}); + + auto reader_result = ReaderFactoryRegistry::Open( + FileFormatType::kParquet, + {.path = temp_parquet_file_, .io = file_io_, .projection = schema}); + ASSERT_THAT(reader_result, IsOk()); + auto reader = std::move(reader_result.value()); + + // By default list columns are read as 32-bit offset list arrays. + auto schema_result = reader->Schema(); + ASSERT_THAT(schema_result, IsOk()); + auto arrow_c_schema = std::move(schema_result.value()); + auto arrow_type = ::arrow::ImportType(&arrow_c_schema).ValueOrDie(); + ASSERT_EQ(arrow_type->field(1)->type()->id(), ::arrow::Type::LIST); + + ASSERT_NO_FATAL_FAILURE( + VerifyNextBatch(*reader, R"([[1, [1, 2]], [2, [3]], [3, null]])")); + ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader)); +} + +TEST_F(ParquetReaderTest, ReadListAsLargeList) { + CreateListParquetFile(); + + auto schema = std::make_shared(std::vector{ + SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeOptional(2, "numbers", + std::make_shared(SchemaField::MakeOptional( + /*field_id=*/101, "element", int32())))}); + + ReaderProperties reader_properties; + reader_properties.Set(ReaderProperties::kArrowUseLargeList, true); + + auto reader_result = ReaderFactoryRegistry::Open( + FileFormatType::kParquet, {.path = temp_parquet_file_, + .io = file_io_, + .projection = schema, + .properties = std::move(reader_properties)}); + ASSERT_THAT(reader_result, IsOk()); + auto reader = std::move(reader_result.value()); + + // The output schema should expose list columns as large_list. + auto schema_result = reader->Schema(); + ASSERT_THAT(schema_result, IsOk()); + auto arrow_c_schema = std::move(schema_result.value()); + auto arrow_type = ::arrow::ImportType(&arrow_c_schema).ValueOrDie(); + ASSERT_EQ(arrow_type->field(1)->type()->id(), ::arrow::Type::LARGE_LIST); + + // JSON parsing creates regular ListArray, so verify large_list data manually. + auto data = reader->Next(); + ASSERT_THAT(data, IsOk()); + ASSERT_TRUE(data.value().has_value()); + auto arrow_c_array = data.value().value(); + auto arrow_array = ::arrow::ImportArray(&arrow_c_array, arrow_type).ValueOrDie(); + + const auto& struct_array = + internal::checked_cast(*arrow_array); + ASSERT_EQ(struct_array.length(), 3); + + const auto& id_array = + internal::checked_cast(*struct_array.field(0)); + ASSERT_EQ(id_array.Value(0), 1); + ASSERT_EQ(id_array.Value(1), 2); + ASSERT_EQ(id_array.Value(2), 3); + + const auto& numbers_array = + internal::checked_cast(*struct_array.field(1)); + ASSERT_EQ(numbers_array.value_slice(0)->length(), 2); + ASSERT_EQ(numbers_array.value_slice(1)->length(), 1); + ASSERT_TRUE(numbers_array.IsNull(2)); + + ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader)); +} + TEST_F(ParquetReaderTest, ReadSplit) { CreateSplitParquetFile(); From 6773aa9d1309049ef1a753d63cea62c5846d0d41 Mon Sep 17 00:00:00 2001 From: rahulsmahadev Date: Wed, 10 Jun 2026 00:19:34 +0000 Subject: [PATCH 2/2] style: apply clang-format Signed-off-by: rahulsmahadev --- src/iceberg/test/parquet_test.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/iceberg/test/parquet_test.cc b/src/iceberg/test/parquet_test.cc index 3c103a046..a2d3d6ece 100644 --- a/src/iceberg/test/parquet_test.cc +++ b/src/iceberg/test/parquet_test.cc @@ -197,10 +197,10 @@ class ParquetReaderTest : public TempFileTestBase { ASSERT_THAT(ToArrowSchema(*schema, &arrow_c_schema), IsOk()); auto arrow_schema = ::arrow::ImportType(&arrow_c_schema).ValueOrDie(); - auto array = ::arrow::json::ArrayFromJSONString( - ::arrow::struct_(arrow_schema->fields()), - R"([[1, [1, 2]], [2, [3]], [3, null]])") - .ValueOrDie(); + auto array = + ::arrow::json::ArrayFromJSONString(::arrow::struct_(arrow_schema->fields()), + R"([[1, [1, 2]], [2, [3]], [3, null]])") + .ValueOrDie(); WriterProperties writer_properties; writer_properties.Set(WriterProperties::kParquetCompression,