[lake/hudi] Add Hudi Flink union read primary key table IT#3541
[lake/hudi] Add Hudi Flink union read primary key table IT#3541fhan688 wants to merge 4 commits into
Conversation
|
please help review, thanks! @XuQianJin-Stars |
There was a problem hiding this comment.
Pull request overview
This PR extends the Hudi lake integration to support Flink union-read for primary-key tables by introducing a Hudi-specific SortedRecordReader implementation, and adds end-to-end integration tests plus a focused unit test to validate projection constraints for sorting by Hudi record keys.
Changes:
- Add
HudiSortedRecordReaderto provide primary-key ordering for Hudi MERGE_ON_READ splits (enabling sort-merge union reads with Fluss changelog records). - Update
HudiLakeSourceto chooseHudiSortedRecordReaderwhen the Hudi table is MERGE_ON_READ and the projection retains all record-key fields. - Add new IT coverage for union-read PK tables and a UT for
canSortByRecordKey(...)projection rules.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
| fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/source/HudiSortedRecordReader.java | New sorted reader for Hudi PK union reads; resolves record-key columns and enforces ordering. |
| fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/source/HudiLakeSource.java | Routes to the sorted reader for MERGE_ON_READ when record-key columns are preserved by projection. |
| fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/flink/FlinkUnionReadPrimaryKeyTableITCase.java | New end-to-end Flink union-read ITs for Hudi-backed primary-key tables (batch/stream/failover). |
| fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/source/HudiSortedRecordReaderTest.java | New UT validating projection eligibility for sorting by Hudi record keys. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| } | ||
|
|
||
| @ParameterizedTest | ||
| @ValueSource(booleans = {false}) |
There was a problem hiding this comment.
Fixed. I added the partitioned case to testUnionReadFullType and verified both parameterized runs pass.
| } finally { | ||
| iterator.close(); | ||
| } | ||
| records.sort( |
There was a problem hiding this comment.
The read() method loads all records into memory before sorting. This may cause OOM for large-scale datasets. Suggestion: Consider using external sort or limiting the number of records read in a single batch.
There was a problem hiding this comment.
Good point. The current implementation sorts records within one Hudi split to satisfy the SortedRecordReader contract. Introducing spill/external sort would be a larger follow-up improvement, so I’d keep this PR focused on correctness for now.
| private static int compareValues(Object value1, Object value2, LogicalType logicalType) { | ||
| switch (logicalType.getTypeRoot()) { | ||
| case BOOLEAN: | ||
| return Boolean.compare((boolean) value1, (boolean) value2); |
There was a problem hiding this comment.
NullPointerException Risk: The compareValues method directly unboxes BOOLEAN type without handling null values. Suggestion: Check for null values before comparison, or ensure the caller does not pass null.
There was a problem hiding this comment.
Fixed. I added an explicit non-null state check before reading key fields, so compareValues will not receive null key values.
| LogicalType logicalType = dataType.getLogicalType(); | ||
| switch (logicalType.getTypeRoot()) { | ||
| case BOOLEAN: | ||
| return row.getBoolean(pos); |
There was a problem hiding this comment.
getField Method Does Not Handle Null Values: Directly calls row.getBoolean(pos) etc., but if the field is null in the record, it will return default values instead of null.
There was a problem hiding this comment.
Fixed at the compareRows entry point. Since Hudi record keys map to Fluss primary key fields, null key values are invalid and now fail fast with a clear state check.
| List<LogRecord> records = new ArrayList<>(); | ||
| try { | ||
| while (iterator.hasNext()) { | ||
| records.add(copyRecord(iterator.next(), sortOrder.producedTypes)); |
There was a problem hiding this comment.
Unnecessary Deep Copy: Deep copies all records before sorting, increasing memory and CPU overhead. Suggestion: If HudiRecordReader returns LogRecord that are already independent (not sharing internal buffers), this copy can be omitted.
There was a problem hiding this comment.
The copy is still required here. HudiRecordReader reuses the projected row wrapper, while this reader buffers records before sorting. I added a comment to make this ownership requirement explicit.
| for (Schema.UnresolvedColumn column : | ||
| hudiTableInfo.getHudiTable().getUnresolvedSchema().getColumns()) { | ||
| if (SYSTEM_COLUMNS.containsKey(column.getName())) { | ||
| continue; |
There was a problem hiding this comment.
Position Calculation Issue in resolveRecordKeyInfo: When skipping SYSTEM_COLUMNS, the position calculation may be inconsistent with the actual schema position.
There was a problem hiding this comment.
The behavior is intentional because project indexes are based on Fluss user fields, while Hudi system columns are appended internally and stripped from produced records. I renamed the variables to userFieldPosition/keyUserFieldPositions to make this clearer.
|
|
||
| import org.apache.hudi.common.model.HoodieTableType; | ||
| import org.apache.hudi.org.apache.avro.Schema; | ||
| import org.apache.hudi.source.ExpressionPredicates; |
There was a problem hiding this comment.
Unused Import: The constructor receives a predicates parameter but it is not used.
There was a problem hiding this comment.
This is already used: predicates is passed into HudiRecordReader when creating the delegate reader, so I kept it unchanged.
| import static org.apache.fluss.utils.Preconditions.checkState; | ||
|
|
||
| /** Sorted Hudi record reader for primary key table union read. */ | ||
| public class HudiSortedRecordReader implements SortedRecordReader { |
There was a problem hiding this comment.
do we real need to implement SortedRecordReader for hudi?
To me, the cost is high since we will need to sort, which is easy to oom. It's only need for batch reading, and not required for streaming reading.
There was a problem hiding this comment.
Thanks for raising this. I agree that the sorted reader should only be required for batch primary-key union
read, because the current batch LakeSnapshotAndLogSplitScanner relies on SortedRecordReader for sort-merge with Fluss changelog records.
For streaming read, sorting is not required. The current HudiLakeSource selection is too broad because
createRecordReader cannot distinguish whether the caller needs sorted records, so streaming paths may also pay the sorting cost. I will refine the reader selection so HudiSortedRecordReader is only used when the
batch union-read path requires sorted lake records, and keep normal HudiRecordReader for streaming reads.
Linked issue: #3284
Extend the Hudi lake integration so Flink union read works for primary key tables. Building on the union-read test base added in #3522 and the log-table IT in #3528, this PR adds the missing piece for PK tables: a sorted record reader for Hudi merge-on-read splits (so remote Hudi rows can be merged with Fluss log records by primary key) plus end-to-end IT coverage.
Brief change log
fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/source/HudiSortedRecordReader.java— newSortedRecordReaderimplementation for Hudi. Resolves record-key columns fromFlinkOptions.RECORD_KEY_FIELD, drains the underlyingHudiRecordReader, sorts in memory by the projected key positions, and exposesorder()so the union-read merger can interleave remote Hudi rows with Fluss log records. Includes acanSortByRecordKey(...)predicate that rejects projections dropping any record-key column.fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/source/HudiLakeSource.java— increateRecordReader, route toHudiSortedRecordReaderwhen the Hudi table isMERGE_ON_READand the projection retains every record-key field; otherwise keep the existingHudiRecordReaderpath. Mirrors the contract already used byPaimonLakeSource/PaimonSortedRecordReader.Tests
fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/flink/FlinkUnionReadPrimaryKeyTableITCase.java(extendsFlinkUnionReadTestBase):testUnionReadFullType— batch union read over a full-type PK schema, projection (select c3, c4,select c3) and predicate pushdown (c4 = 30); validates state both before and after upsert.testUnionReadInStreamMode— streaming changelog correctness on partitioned and non-partitioned PK tables, including-U/+Uemission after upsert.testUnionReadPrimaryKeyTableFailover—INSERT ... SELECTpipeline stopped with a canonical savepoint and resumed; verifies changelog continuity across restart.fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/source/HudiSortedRecordReaderTest.java— coverscanSortByRecordKeyprojection rules for single-column and composite primary keys.API and Format
No public API or storage format changes.
HudiSortedRecordReaderreuses the existingorg.apache.fluss.lake.source.SortedRecordReadercontract defined influss-common.Documentation
No new user-facing feature; no documentation changes required.