Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/iceberg/file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ class ICEBERG_EXPORT ReaderProperties : public ConfigBase<ReaderProperties> {

/// \brief The batch size to read.
inline static Entry<int64_t> kBatchSize{"read.batch-size", 4096};
/// \brief Read list columns as Arrow large_list (64-bit offsets) instead of list.
/// Default: false (use 32-bit offset list).
inline static Entry<bool> kArrowUseLargeList{"read.arrow.use-large-list", false};
/// \brief Skip GenericDatum in Avro reader for better performance.
/// When true, decode directly from Avro to Arrow without GenericDatum intermediate.
/// Default: true (skip GenericDatum for better performance).
Expand Down
53 changes: 53 additions & 0 deletions src/iceberg/parquet/parquet_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include "iceberg/result.h"
#include "iceberg/schema_internal.h"
#include "iceberg/schema_util.h"
#include "iceberg/util/checked_cast.h"
#include "iceberg/util/macros.h"

namespace iceberg::parquet {
Expand Down Expand Up @@ -84,6 +85,41 @@ class EmptyRecordBatchReader : public ::arrow::RecordBatchReader {
}
};

std::shared_ptr<::arrow::Field> UseLargeListField(
const std::shared_ptr<::arrow::Field>& field);

// Rebuild a data type with all nested list types replaced by large_list.
std::shared_ptr<::arrow::DataType> UseLargeListType(
const std::shared_ptr<::arrow::DataType>& type) {
switch (type->id()) {
case ::arrow::Type::LIST: {
const auto& list_type = internal::checked_cast<const ::arrow::ListType&>(*type);
return ::arrow::large_list(UseLargeListField(list_type.value_field()));
}
case ::arrow::Type::STRUCT: {
::arrow::FieldVector fields;
fields.reserve(type->num_fields());
for (const auto& field : type->fields()) {
fields.push_back(UseLargeListField(field));
}
return ::arrow::struct_(std::move(fields));
}
case ::arrow::Type::MAP: {
const auto& map_type = internal::checked_cast<const ::arrow::MapType&>(*type);
return std::make_shared<::arrow::MapType>(UseLargeListField(map_type.key_field()),
UseLargeListField(map_type.item_field()),
map_type.keys_sorted());
}
default:
return type;
}
}

std::shared_ptr<::arrow::Field> UseLargeListField(
const std::shared_ptr<::arrow::Field>& field) {
return field->WithType(UseLargeListType(field->type()));
}

} // namespace

// A stateful context to keep track of the reading progress.
Expand Down Expand Up @@ -118,6 +154,10 @@ class ParquetReader::Impl {
arrow_reader_properties.set_batch_size(
options.properties.Get(ReaderProperties::kBatchSize));
arrow_reader_properties.set_arrow_extensions_enabled(true);
use_large_list_ = options.properties.Get(ReaderProperties::kArrowUseLargeList);
if (use_large_list_) {
arrow_reader_properties.set_list_type(::arrow::Type::LARGE_LIST);
}

// Open the Parquet file reader
ICEBERG_ASSIGN_OR_RAISE(input_stream_, OpenInputStream(options));
Expand Down Expand Up @@ -214,6 +254,17 @@ class ParquetReader::Impl {
ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*read_schema_, &arrow_schema));
ICEBERG_ARROW_ASSIGN_OR_RETURN(context_->output_arrow_schema_,
::arrow::ImportSchema(&arrow_schema));
if (use_large_list_) {
// Align the output schema with the large_list arrays produced by the
// Parquet reader when kArrowUseLargeList is enabled.
::arrow::FieldVector fields;
fields.reserve(context_->output_arrow_schema_->fields().size());
for (const auto& field : context_->output_arrow_schema_->fields()) {
fields.push_back(UseLargeListField(field));
}
context_->output_arrow_schema_ =
::arrow::schema(std::move(fields), context_->output_arrow_schema_->metadata());
}

// Row group pruning based on the split
// TODO(gangwu): add row group filtering based on zone map, bloom filter, etc.
Expand Down Expand Up @@ -255,6 +306,8 @@ class ParquetReader::Impl {
::arrow::MemoryPool* pool_ = ::arrow::default_memory_pool();
// The split to read from the Parquet file.
std::optional<Split> split_;
// Whether to read list columns as large_list (64-bit offsets).
bool use_large_list_ = false;
// Schema to read from the Parquet file.
std::shared_ptr<::iceberg::Schema> read_schema_;
// The projection result to apply to the read schema.
Expand Down
3 changes: 2 additions & 1 deletion src/iceberg/parquet/parquet_schema_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,8 @@ Status ValidateParquetSchemaEvolution(
}
break;
case TypeId::kList:
if (arrow_type->id() == ::arrow::Type::LIST) {
if (arrow_type->id() == ::arrow::Type::LIST ||
arrow_type->id() == ::arrow::Type::LARGE_LIST) {
return {};
}
break;
Expand Down
46 changes: 45 additions & 1 deletion src/iceberg/test/parquet_schema_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,14 @@ ::parquet::schema::NodePtr MakeMapNode(const std::string& name,

// Helper to create SchemaManifest from Parquet schema
::parquet::arrow::SchemaManifest MakeSchemaManifest(
const ::parquet::schema::NodePtr& parquet_schema) {
const ::parquet::schema::NodePtr& parquet_schema,
::arrow::Type::type list_type = ::arrow::Type::LIST) {
auto parquet_schema_descriptor = std::make_shared<::parquet::SchemaDescriptor>();
parquet_schema_descriptor->Init(parquet_schema);

auto properties = ::parquet::default_arrow_reader_properties();
properties.set_arrow_extensions_enabled(true);
properties.set_list_type(list_type);

::parquet::arrow::SchemaManifest manifest;
auto status = ::parquet::arrow::SchemaManifest::Make(parquet_schema_descriptor.get(),
Expand Down Expand Up @@ -340,6 +342,16 @@ TEST(ParquetSchemaProjectionTest, ValidateSchemaEvolutionAllowsNullPhysicalType)
ASSERT_THAT(status, IsOk());
}

TEST(ParquetSchemaProjectionTest, ValidateSchemaEvolutionAllowsLargeList) {
::parquet::arrow::SchemaField parquet_field;
parquet_field.field = ::arrow::field("numbers", ::arrow::large_list(::arrow::int32()));

ListType expected_type(
SchemaField::MakeOptional(/*field_id=*/101, "element", iceberg::int32()));
auto status = ValidateParquetSchemaEvolution(expected_type, parquet_field);
ASSERT_THAT(status, IsOk());
}

TEST(ParquetSchemaProjectionTest, ProjectNullPhysicalFieldsAsNull) {
Schema expected_schema({
SchemaField::MakeOptional(/*field_id=*/1, "age", iceberg::int32()),
Expand Down Expand Up @@ -624,6 +636,38 @@ TEST(ParquetSchemaProjectionTest, ProjectListType) {
ASSERT_EQ(SelectedColumnIndices(projection), std::vector<int32_t>({0, 1}));
}

TEST(ParquetSchemaProjectionTest, ProjectLargeListType) {
Schema expected_schema({
SchemaField::MakeOptional(
/*field_id=*/2, "numbers",
std::make_shared<ListType>(SchemaField::MakeOptional(
/*field_id=*/101, "element", iceberg::int32()))),
});

auto parquet_schema = MakeGroupNode(
"iceberg_schema",
{
MakeListNode("numbers", MakeInt32Node("element", /*field_id=*/101),
/*field_id=*/2),
});

auto schema_manifest = MakeSchemaManifest(parquet_schema, ::arrow::Type::LARGE_LIST);
ASSERT_EQ(schema_manifest.schema_fields[0].field->type()->id(),
::arrow::Type::LARGE_LIST);

auto projection_result = Project(expected_schema, schema_manifest);
ASSERT_THAT(projection_result, IsOk());

const auto& projection = *projection_result;
ASSERT_EQ(projection.fields.size(), 1);
ASSERT_PROJECTED_FIELD(projection.fields[0], 0);

ASSERT_EQ(projection.fields[0].children.size(), 1);
ASSERT_PROJECTED_FIELD(projection.fields[0].children[0], 0);

ASSERT_EQ(SelectedColumnIndices(projection), std::vector<int32_t>({0}));
}

TEST(ParquetSchemaProjectionTest, ProjectMapType) {
Schema expected_schema({
SchemaField::MakeOptional(
Expand Down
106 changes: 106 additions & 0 deletions src/iceberg/test/parquet_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,32 @@ class ParquetReaderTest : public TempFileTestBase {
.properties = std::move(writer_properties)}));
}

void CreateListParquetFile() {
auto schema = std::make_shared<Schema>(std::vector<SchemaField>{
SchemaField::MakeRequired(1, "id", int32()),
SchemaField::MakeOptional(2, "numbers",
std::make_shared<ListType>(SchemaField::MakeOptional(
/*field_id=*/101, "element", int32())))});

ArrowSchema arrow_c_schema;
ASSERT_THAT(ToArrowSchema(*schema, &arrow_c_schema), IsOk());
auto arrow_schema = ::arrow::ImportType(&arrow_c_schema).ValueOrDie();

auto array =
::arrow::json::ArrayFromJSONString(::arrow::struct_(arrow_schema->fields()),
R"([[1, [1, 2]], [2, [3]], [3, null]])")
.ValueOrDie();

WriterProperties writer_properties;
writer_properties.Set(WriterProperties::kParquetCompression,
std::string("uncompressed"));

ASSERT_TRUE(WriteArray(array, {.path = temp_parquet_file_,
.schema = schema,
.io = file_io_,
.properties = std::move(writer_properties)}));
}

void CreateSplitParquetFile() {
const std::string kParquetFieldIdKey = "PARQUET:field_id";
auto arrow_schema = ::arrow::schema(
Expand Down Expand Up @@ -339,6 +365,86 @@ TEST_F(ParquetReaderTest, ReadWithBatchSize) {
ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader));
}

TEST_F(ParquetReaderTest, ReadListType) {
CreateListParquetFile();

auto schema = std::make_shared<Schema>(std::vector<SchemaField>{
SchemaField::MakeRequired(1, "id", int32()),
SchemaField::MakeOptional(2, "numbers",
std::make_shared<ListType>(SchemaField::MakeOptional(
/*field_id=*/101, "element", int32())))});

auto reader_result = ReaderFactoryRegistry::Open(
FileFormatType::kParquet,
{.path = temp_parquet_file_, .io = file_io_, .projection = schema});
ASSERT_THAT(reader_result, IsOk());
auto reader = std::move(reader_result.value());

// By default list columns are read as 32-bit offset list arrays.
auto schema_result = reader->Schema();
ASSERT_THAT(schema_result, IsOk());
auto arrow_c_schema = std::move(schema_result.value());
auto arrow_type = ::arrow::ImportType(&arrow_c_schema).ValueOrDie();
ASSERT_EQ(arrow_type->field(1)->type()->id(), ::arrow::Type::LIST);

ASSERT_NO_FATAL_FAILURE(
VerifyNextBatch(*reader, R"([[1, [1, 2]], [2, [3]], [3, null]])"));
ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader));
}

TEST_F(ParquetReaderTest, ReadListAsLargeList) {
CreateListParquetFile();

auto schema = std::make_shared<Schema>(std::vector<SchemaField>{
SchemaField::MakeRequired(1, "id", int32()),
SchemaField::MakeOptional(2, "numbers",
std::make_shared<ListType>(SchemaField::MakeOptional(
/*field_id=*/101, "element", int32())))});

ReaderProperties reader_properties;
reader_properties.Set(ReaderProperties::kArrowUseLargeList, true);

auto reader_result = ReaderFactoryRegistry::Open(
FileFormatType::kParquet, {.path = temp_parquet_file_,
.io = file_io_,
.projection = schema,
.properties = std::move(reader_properties)});
ASSERT_THAT(reader_result, IsOk());
auto reader = std::move(reader_result.value());

// The output schema should expose list columns as large_list.
auto schema_result = reader->Schema();
ASSERT_THAT(schema_result, IsOk());
auto arrow_c_schema = std::move(schema_result.value());
auto arrow_type = ::arrow::ImportType(&arrow_c_schema).ValueOrDie();
ASSERT_EQ(arrow_type->field(1)->type()->id(), ::arrow::Type::LARGE_LIST);

// JSON parsing creates regular ListArray, so verify large_list data manually.
auto data = reader->Next();
ASSERT_THAT(data, IsOk());
ASSERT_TRUE(data.value().has_value());
auto arrow_c_array = data.value().value();
auto arrow_array = ::arrow::ImportArray(&arrow_c_array, arrow_type).ValueOrDie();

const auto& struct_array =
internal::checked_cast<const ::arrow::StructArray&>(*arrow_array);
ASSERT_EQ(struct_array.length(), 3);

const auto& id_array =
internal::checked_cast<const ::arrow::Int32Array&>(*struct_array.field(0));
ASSERT_EQ(id_array.Value(0), 1);
ASSERT_EQ(id_array.Value(1), 2);
ASSERT_EQ(id_array.Value(2), 3);

const auto& numbers_array =
internal::checked_cast<const ::arrow::LargeListArray&>(*struct_array.field(1));
ASSERT_EQ(numbers_array.value_slice(0)->length(), 2);
ASSERT_EQ(numbers_array.value_slice(1)->length(), 1);
ASSERT_TRUE(numbers_array.IsNull(2));

ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader));
}

TEST_F(ParquetReaderTest, ReadSplit) {
CreateSplitParquetFile();

Expand Down
Loading