[AURON #2080] Support Hive Parquet table to NativeParquetHiveTableScanExec #2081
[AURON #2080] Support Hive Parquet table to NativeParquetHiveTableScanExec #2081guixiaowen wants to merge 1 commit into
Conversation
69c67ac to
2cafb1d
Compare
6b0631f to
3ade2b1
Compare
9fa8a84 to
8c67b23
Compare
8c67b23 to
9fa8a84
Compare
3849d4c to
fe6a8bd
Compare
9842c63 to
a30703b
Compare
2013481 to
bd4ecca
Compare
merrily01
left a comment
There was a problem hiding this comment.
Hi @guixiaowen
Could you take a look and get the CI fixed?
https://github.com/apache/auron/actions/runs/26717696445/job/82051444906?pr=2081
weiqingy
left a comment
There was a problem hiding this comment.
Thanks for taking this on — reusing NativeHiveTableScanBase and wiring the scan in through the AuronConvertProvider SPI (mirroring PaimonConvertProvider) is a clean fit. A few questions inline.
| import org.apache.spark.sql.hive.execution.HiveTableScanExec | ||
|
|
||
| class HiveConvertProvider extends AuronConvertProvider with Logging { | ||
| override def isEnabled: Boolean = |
There was a problem hiding this comment.
AuronConvertProvider declares isEnabled(exec: SparkPlan): Boolean, and the dispatcher calls it with the plan (AuronConverters.scala:216 and :307). This override drops the parameter, so it doesn't implement the trait method — the abstract isEnabled(SparkPlan) stays unimplemented and override matches nothing, which the compiler rejects. That looks like the CI break flagged above. PaimonConvertProvider keeps the parameter and branches inside — would matching that signature work here?
override def isEnabled(exec: SparkPlan): Boolean =
getBooleanConf("spark.auron.enable.hiveTable", defaultValue = true)| private def getMaxSplitBytes(sparkSession: SparkSession): Long = { | ||
| val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes | ||
| val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes | ||
| Math.min(defaultMaxSplitBytes, openCostInBytes) |
There was a problem hiding this comment.
filesMaxPartitionBytes defaults to 128MB and filesOpenCostInBytes to 4MB, so Math.min here always returns the open-cost (4MB) and every split is capped at it — which would fan a large table out into many tiny partitions. Spark computes the cap as min(maxPartitionBytes, max(openCostInBytes, bytesPerCore)) rather than a straight min of the two. Was min intended here? FilePartition.maxSplitBytes already implements that formula if it helps, though note it's keyed on Seq[PartitionDirectory] rather than the Seq[PartitionedFile] you have here, so it's not a literal drop-in.
| private def enableHiveTableScanExec: Boolean = | ||
| getBooleanConf("spark.auron.enable.parquetHiveTableScanExec", defaultValue = false) | ||
|
|
||
| override def isSupported(exec: SparkPlan): Boolean = |
There was a problem hiding this comment.
isSupported returns true for any table whose provider is hive, but convert only builds a native plan when isParquetTable holds and otherwise returns exec unchanged (line 49). So a non-Parquet Hive table (ORC, text) with both flags on would be reported as supported and then handed back un-converted, instead of falling through to the normal conversion path. Would gating isSupported on isParquetTable too keep the two in lockstep?
| </excludes> | ||
| </relocation> | ||
| <relocation> | ||
| <pattern>io.netty</pattern> |
There was a problem hiding this comment.
This io.netty relocation was added in #1597 to fix NoClassDefFoundError: io/netty/buffer/Unpooled under shaded-spark — that change bundled netty-buffer/netty-common into the assembly and relocated them so they wouldn't clash. This PR removes the relocation but keeps those bundled deps (pom.xml:48-55), so netty ships at its original io.netty.* coordinates again, which is the state #1597 fixed away. Does dropping it reopen that shaded-spark conflict? Since the relocation only affects the shaded assembly — not the unit tests or the Hive Parquet path here — it'd help to know what in this PR needs it gone, along with the arrow-memory-netty exclusions added to the shims pom.xml.
Which issue does this PR close?
Closes #2080
Rationale for this change
Switch the Spark execution plan to HiveTableScanExec, and when the table storage type is Parquet, the native execution plan NativeParquetHiveTableScanExec executes.
Design parameters are as follows:
spark.auron.enable.hiveTable and spark.auron.enable.parquetHiveTableScanExec parameters
Eg:
spark.auron.enable.hiveTable default is true (When set to true, this conversion is enabled by default)
spark.auron.enable.parquetHiveTableScanExec default is false(When set to true, this conversion is enabled by default))
What changes are included in this PR?
Are there any user-facing changes?
NO.
How was this patch tested?
UT