diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/LakeSnapshotAndLogSplitScanner.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/LakeSnapshotAndLogSplitScanner.java index 15352f63b6..661f958a0f 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/LakeSnapshotAndLogSplitScanner.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/LakeSnapshotAndLogSplitScanner.java @@ -188,10 +188,10 @@ private void initializeLakeRecordIterators() throws IOException { List recordReaders = new ArrayList<>(); if (lakeSplits == null || lakeSplits.isEmpty()) { // pass null split to get rowComparator - recordReaders.add(lakeSource.createRecordReader(() -> null)); + recordReaders.add(lakeSource.createRecordReader(sortedReaderContext(null))); } else { for (LakeSplit lakeSplit : lakeSplits) { - recordReaders.add(lakeSource.createRecordReader(() -> lakeSplit)); + recordReaders.add(lakeSource.createRecordReader(sortedReaderContext(lakeSplit))); } } for (RecordReader reader : recordReaders) { @@ -206,6 +206,21 @@ private void initializeLakeRecordIterators() throws IOException { lakeRecordIteratorsInitialized = true; } + private LakeSource.ReaderContext sortedReaderContext(@Nullable LakeSplit lakeSplit) { + return new LakeSource.ReaderContext() { + @Nullable + @Override + public LakeSplit lakeSplit() { + return lakeSplit; + } + + @Override + public boolean requireSortedRecords() { + return true; + } + }; + } + private void pollLogRecords(Duration timeout) { ScanRecords scanRecords = logScanner.poll(timeout); for (ScanRecord scanRecord : scanRecords) { diff --git a/fluss-common/src/main/java/org/apache/fluss/lake/source/LakeSource.java b/fluss-common/src/main/java/org/apache/fluss/lake/source/LakeSource.java index 49a6c248e5..bec63a9dac 100644 --- a/fluss-common/src/main/java/org/apache/fluss/lake/source/LakeSource.java +++ b/fluss-common/src/main/java/org/apache/fluss/lake/source/LakeSource.java @@ -21,6 +21,8 @@ import org.apache.fluss.lake.serializer.SimpleVersionedSerializer; import org.apache.fluss.predicate.Predicate; +import javax.annotation.Nullable; + import java.io.IOException; import java.io.Serializable; import java.util.List; @@ -91,12 +93,28 @@ interface PlannerContext extends Serializable { } /** - * Context interface for record readers, providing access to the lake split being read. + * Context interface for record readers, providing access to the lake split being read and the + * required reading semantics. * * @param The type of lake split */ interface ReaderContext extends Serializable { + /** + * Returns the lake split to read. + * + *

The split can be null when the caller only needs reader-level metadata, such as a sort + * comparator. + */ + @Nullable Split lakeSplit(); + + /** + * Returns whether records produced by this reader must follow the order defined by {@link + * SortedRecordReader#order()}. + */ + default boolean requireSortedRecords() { + return false; + } } /** diff --git a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/source/HudiLakeSource.java b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/source/HudiLakeSource.java index 7b16368622..0fdf8cc9b7 100644 --- a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/source/HudiLakeSource.java +++ b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/source/HudiLakeSource.java @@ -27,6 +27,7 @@ import org.apache.fluss.metadata.TablePath; import org.apache.fluss.predicate.Predicate; +import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.org.apache.avro.Schema; import org.apache.hudi.source.ExpressionPredicates; import org.apache.hudi.util.StreamerUtil; @@ -102,6 +103,17 @@ public Planner createPlanner(PlannerContext context) throws IOExcepti @Override public RecordReader createRecordReader(ReaderContext context) throws IOException { try { + if (context.requireSortedRecords()) { + if (!shouldUseSortedRecordReader()) { + throw new UnsupportedOperationException( + "Hudi lake source can not provide sorted records for " + + tablePath + + ". Sorted records are only supported for MERGE_ON_READ tables" + + " whose projection contains all Hudi record key fields."); + } + return new HudiSortedRecordReader( + hudiConfig, tablePath, context.lakeSplit(), project, predicates); + } return new HudiRecordReader( hudiConfig, tablePath, context.lakeSplit(), project, predicates); } catch (Exception e) { @@ -121,4 +133,11 @@ private Schema getHudiSchema() { throw new RuntimeException("Fail to get Hudi schema for " + tablePath + ".", e); } } + + private boolean shouldUseSortedRecordReader() throws IOException { + try (HudiTableInfo hudiTableInfo = HudiTableInfo.create(tablePath, hudiConfig)) { + return hudiTableInfo.getTableType() == HoodieTableType.MERGE_ON_READ + && HudiSortedRecordReader.canSortByRecordKey(hudiTableInfo, project); + } + } } diff --git a/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/source/HudiSortedRecordReader.java b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/source/HudiSortedRecordReader.java new file mode 100644 index 0000000000..efa3f1bc60 --- /dev/null +++ b/fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/source/HudiSortedRecordReader.java @@ -0,0 +1,445 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.hudi.source; + +import org.apache.fluss.config.Configuration; +import org.apache.fluss.lake.hudi.utils.HudiTableInfo; +import org.apache.fluss.lake.source.RecordReader; +import org.apache.fluss.lake.source.SortedRecordReader; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.record.GenericRecord; +import org.apache.fluss.record.LogRecord; +import org.apache.fluss.row.BinaryString; +import org.apache.fluss.row.Decimal; +import org.apache.fluss.row.GenericRow; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.types.DataTypeRoot; +import org.apache.fluss.utils.CloseableIterator; +import org.apache.fluss.utils.InternalRowUtils; + +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.types.AbstractDataType; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.source.ExpressionPredicates; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.fluss.lake.hudi.HudiLakeCatalog.SYSTEM_COLUMNS; +import static org.apache.fluss.utils.Preconditions.checkState; + +/** Sorted Hudi record reader for primary key table union read. */ +public class HudiSortedRecordReader implements SortedRecordReader { + + private static final String DELIMITER = ","; + + private final @Nullable RecordReader delegate; + private final SortOrder sortOrder; + + public HudiSortedRecordReader( + Configuration hudiConfig, + TablePath tablePath, + @Nullable HudiSplit hudiSplit, + @Nullable int[][] project, + List predicates) + throws Exception { + this.delegate = + hudiSplit == null + ? null + : new HudiRecordReader( + hudiConfig, tablePath, hudiSplit, project, predicates); + this.sortOrder = createSortOrder(hudiConfig, tablePath, project); + } + + @Override + public CloseableIterator read() throws IOException { + if (delegate == null) { + return CloseableIterator.wrap(Collections.emptyIterator()); + } + CloseableIterator iterator = delegate.read(); + // TODO: Introduce a spillable sorter for large Hudi splits to avoid keeping all records + // in memory while preserving the SortedRecordReader order contract. + List records = new ArrayList<>(); + try { + while (iterator.hasNext()) { + // HudiRecordReader reuses its projected row wrapper, so records must be copied + // before they are buffered for sorting. + records.add(copyRecord(iterator.next(), sortOrder.producedTypes)); + } + } finally { + iterator.close(); + } + records.sort( + (record1, record2) -> + compareRows( + record1.getRow(), + record2.getRow(), + sortOrder.keyPositionsInRecord, + sortOrder.keyTypes)); + return CloseableIterator.wrap(records.iterator()); + } + + @Override + public Comparator order() { + return (row1, row2) -> + compareRows(row1, row2, sortOrder.keyPositionsInKey, sortOrder.keyTypes); + } + + static boolean canSortByRecordKey(HudiTableInfo hudiTableInfo, @Nullable int[][] project) { + RecordKeyInfo recordKeyInfo = resolveRecordKeyInfo(hudiTableInfo); + if (project == null) { + return true; + } + for (int userFieldPosition : recordKeyInfo.keyUserFieldPositions) { + if (findProducedPosition(userFieldPosition, project) < 0) { + return false; + } + } + return true; + } + + private static SortOrder createSortOrder( + Configuration hudiConfig, TablePath tablePath, @Nullable int[][] project) + throws IOException { + try (HudiTableInfo hudiTableInfo = HudiTableInfo.create(tablePath, hudiConfig)) { + return resolveSortOrder(hudiTableInfo, project); + } + } + + private static SortOrder resolveSortOrder( + HudiTableInfo hudiTableInfo, @Nullable int[][] project) { + RecordKeyInfo recordKeyInfo = resolveRecordKeyInfo(hudiTableInfo); + int[] keyPositions = new int[recordKeyInfo.keyUserFieldPositions.length]; + for (int i = 0; i < recordKeyInfo.keyUserFieldPositions.length; i++) { + int keyPosition = findProducedPosition(recordKeyInfo.keyUserFieldPositions[i], project); + if (keyPosition < 0) { + throw new IllegalArgumentException( + "Can not find Hudi record key field in projected fields."); + } + keyPositions[i] = keyPosition; + } + return new SortOrder( + recordKeyInfo.keyTypes, + keyPositions, + producedTypes(recordKeyInfo.userDataTypes, project)); + } + + private static RecordKeyInfo resolveRecordKeyInfo(HudiTableInfo hudiTableInfo) { + String recordKeyFields = + hudiTableInfo.getTableOptions().get(FlinkOptions.RECORD_KEY_FIELD.key()); + if (recordKeyFields == null || recordKeyFields.trim().isEmpty()) { + throw new IllegalArgumentException( + "Hudi record key field is required for sorted primary key table reader."); + } + + Map dataTypesByName = new HashMap<>(); + Map userFieldPositionsByName = new HashMap<>(); + List userDataTypes = new ArrayList<>(); + int userFieldPosition = 0; + for (Schema.UnresolvedColumn column : + hudiTableInfo.getHudiTable().getUnresolvedSchema().getColumns()) { + if (SYSTEM_COLUMNS.containsKey(column.getName())) { + continue; + } + DataType dataType = getDataType(column); + dataTypesByName.put(column.getName(), dataType); + userFieldPositionsByName.put(column.getName(), userFieldPosition++); + userDataTypes.add(dataType); + } + + List keyFields = + Arrays.stream(recordKeyFields.split(DELIMITER)) + .map(String::trim) + .filter(key -> !key.isEmpty()) + .collect(Collectors.toList()); + + List keyTypes = new ArrayList<>(keyFields.size()); + int[] keyUserFieldPositions = new int[keyFields.size()]; + for (int i = 0; i < keyFields.size(); i++) { + String key = keyFields.get(i); + DataType dataType = dataTypesByName.get(key); + Integer keyUserFieldPosition = userFieldPositionsByName.get(key); + if (dataType == null || keyUserFieldPosition == null) { + throw new IllegalArgumentException( + "Can not find Hudi record key field " + key + "."); + } + keyTypes.add(dataType); + keyUserFieldPositions[i] = keyUserFieldPosition; + } + return new RecordKeyInfo(userDataTypes, keyTypes, keyUserFieldPositions); + } + + private static DataType getDataType(Schema.UnresolvedColumn column) { + AbstractDataType dataType; + if (column instanceof Schema.UnresolvedPhysicalColumn) { + dataType = ((Schema.UnresolvedPhysicalColumn) column).getDataType(); + } else if (column instanceof Schema.UnresolvedMetadataColumn) { + dataType = ((Schema.UnresolvedMetadataColumn) column).getDataType(); + } else { + throw new IllegalStateException("Unexpected column kind: " + column.getClass()); + } + if (dataType instanceof DataType) { + return (DataType) dataType; + } + throw new IllegalArgumentException("Unsupported Hudi column data type: " + dataType); + } + + private static int findProducedPosition(int originalPosition, @Nullable int[][] project) { + if (project == null) { + return originalPosition; + } + for (int i = 0; i < project.length; i++) { + if (project[i].length > 0 && project[i][0] == originalPosition) { + return i; + } + } + return -1; + } + + private static List producedTypes( + List userDataTypes, @Nullable int[][] project) { + if (project == null) { + return userDataTypes; + } + List producedTypes = new ArrayList<>(project.length); + for (int[] projectPath : project) { + if (projectPath.length > 0) { + producedTypes.add(userDataTypes.get(projectPath[0])); + } + } + return producedTypes; + } + + private static LogRecord copyRecord(LogRecord record, List producedTypes) { + return new GenericRecord( + record.logOffset(), + record.timestamp(), + record.getChangeType(), + copyRow(record.getRow(), producedTypes)); + } + + private static InternalRow copyRow(InternalRow row, List fieldTypes) { + GenericRow copiedRow = new GenericRow(fieldTypes.size()); + for (int i = 0; i < fieldTypes.size(); i++) { + if (row.isNullAt(i)) { + copiedRow.setField(i, null); + } else { + copiedRow.setField(i, copyField(row, i, fieldTypes.get(i).getLogicalType())); + } + } + return copiedRow; + } + + private static Object copyField(InternalRow row, int pos, LogicalType logicalType) { + switch (logicalType.getTypeRoot()) { + case BOOLEAN: + return row.getBoolean(pos); + case TINYINT: + return row.getByte(pos); + case SMALLINT: + return row.getShort(pos); + case INTEGER: + case DATE: + case TIME_WITHOUT_TIME_ZONE: + return row.getInt(pos); + case BIGINT: + return row.getLong(pos); + case FLOAT: + return row.getFloat(pos); + case DOUBLE: + return row.getDouble(pos); + case CHAR: + case VARCHAR: + return row.getString(pos).copy(); + case DECIMAL: + DecimalType decimalType = (DecimalType) logicalType; + Decimal decimal = + row.getDecimal(pos, decimalType.getPrecision(), decimalType.getScale()); + return decimal.copy(); + case TIMESTAMP_WITHOUT_TIME_ZONE: + TimestampType timestampType = (TimestampType) logicalType; + return row.getTimestampNtz(pos, timestampType.getPrecision()); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + LocalZonedTimestampType timestampLtzType = (LocalZonedTimestampType) logicalType; + return row.getTimestampLtz(pos, timestampLtzType.getPrecision()); + case BINARY: + case VARBINARY: + byte[] bytes = row.getBytes(pos); + return Arrays.copyOf(bytes, bytes.length); + default: + throw new IllegalArgumentException( + "Unsupported Hudi row field type: " + logicalType); + } + } + + private static int compareRows( + InternalRow row1, InternalRow row2, int[] keyPositions, List keyTypes) { + for (int pos = 0; pos < keyTypes.size(); pos++) { + DataType keyType = keyTypes.get(pos); + int keyPosition = keyPositions[pos]; + checkState( + !row1.isNullAt(keyPosition) && !row2.isNullAt(keyPosition), + "Hudi record key field at position %s must not be null.", + keyPosition); + int result = + compareValues( + getField(row1, keyPosition, keyType), + getField(row2, keyPosition, keyType), + keyType.getLogicalType()); + if (result != 0) { + return result; + } + } + return 0; + } + + private static Object getField(InternalRow row, int pos, DataType dataType) { + LogicalType logicalType = dataType.getLogicalType(); + switch (logicalType.getTypeRoot()) { + case BOOLEAN: + return row.getBoolean(pos); + case TINYINT: + return row.getByte(pos); + case SMALLINT: + return row.getShort(pos); + case INTEGER: + case DATE: + case TIME_WITHOUT_TIME_ZONE: + return row.getInt(pos); + case BIGINT: + return row.getLong(pos); + case FLOAT: + return row.getFloat(pos); + case DOUBLE: + return row.getDouble(pos); + case CHAR: + case VARCHAR: + return row.getString(pos); + case DECIMAL: + DecimalType decimalType = (DecimalType) logicalType; + return row.getDecimal(pos, decimalType.getPrecision(), decimalType.getScale()); + case TIMESTAMP_WITHOUT_TIME_ZONE: + TimestampType timestampType = (TimestampType) logicalType; + return row.getTimestampNtz(pos, timestampType.getPrecision()); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + LocalZonedTimestampType timestampLtzType = (LocalZonedTimestampType) logicalType; + return row.getTimestampLtz(pos, timestampLtzType.getPrecision()); + case BINARY: + case VARBINARY: + return row.getBytes(pos); + default: + throw new IllegalArgumentException( + "Unsupported Hudi primary key type: " + logicalType); + } + } + + private static int compareValues(Object value1, Object value2, LogicalType logicalType) { + switch (logicalType.getTypeRoot()) { + case BOOLEAN: + return Boolean.compare((boolean) value1, (boolean) value2); + case CHAR: + case VARCHAR: + return ((BinaryString) value1).compareTo((BinaryString) value2); + case VARBINARY: + return InternalRowUtils.compare(value1, value2, DataTypeRoot.BYTES); + default: + return InternalRowUtils.compare( + value1, value2, toFlussDataTypeRoot(logicalType.getTypeRoot())); + } + } + + private static DataTypeRoot toFlussDataTypeRoot( + org.apache.flink.table.types.logical.LogicalTypeRoot logicalTypeRoot) { + switch (logicalTypeRoot) { + case TINYINT: + return DataTypeRoot.TINYINT; + case SMALLINT: + return DataTypeRoot.SMALLINT; + case INTEGER: + return DataTypeRoot.INTEGER; + case BIGINT: + return DataTypeRoot.BIGINT; + case FLOAT: + return DataTypeRoot.FLOAT; + case DOUBLE: + return DataTypeRoot.DOUBLE; + case DECIMAL: + return DataTypeRoot.DECIMAL; + case DATE: + return DataTypeRoot.DATE; + case TIME_WITHOUT_TIME_ZONE: + return DataTypeRoot.TIME_WITHOUT_TIME_ZONE; + case TIMESTAMP_WITHOUT_TIME_ZONE: + return DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE; + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE; + case BINARY: + return DataTypeRoot.BINARY; + default: + throw new IllegalArgumentException( + "Unsupported Hudi primary key type: " + logicalTypeRoot); + } + } + + private static class SortOrder { + private final List keyTypes; + private final int[] keyPositionsInRecord; + private final int[] keyPositionsInKey; + private final List producedTypes; + + private SortOrder( + List keyTypes, int[] keyPositionsInRecord, List producedTypes) { + this.keyTypes = keyTypes; + this.keyPositionsInRecord = keyPositionsInRecord; + this.producedTypes = producedTypes; + this.keyPositionsInKey = new int[keyTypes.size()]; + for (int i = 0; i < keyTypes.size(); i++) { + this.keyPositionsInKey[i] = i; + } + } + } + + private static class RecordKeyInfo { + private final List userDataTypes; + private final List keyTypes; + private final int[] keyUserFieldPositions; + + private RecordKeyInfo( + List userDataTypes, + List keyTypes, + int[] keyUserFieldPositions) { + this.userDataTypes = userDataTypes; + this.keyTypes = keyTypes; + this.keyUserFieldPositions = keyUserFieldPositions; + } + } +} diff --git a/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/flink/FlinkUnionReadPrimaryKeyTableITCase.java b/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/flink/FlinkUnionReadPrimaryKeyTableITCase.java new file mode 100644 index 0000000000..dfb8b22d27 --- /dev/null +++ b/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/flink/FlinkUnionReadPrimaryKeyTableITCase.java @@ -0,0 +1,596 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.hudi.flink; + +import org.apache.fluss.config.AutoPartitionTimeUnit; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.row.Decimal; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.row.TimestampLtz; +import org.apache.fluss.row.TimestampNtz; +import org.apache.fluss.server.replica.Replica; +import org.apache.fluss.types.DataTypes; + +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.core.execution.SavepointFormatType; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.CollectionUtil; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import javax.annotation.Nullable; + +import java.io.File; +import java.math.BigDecimal; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsExactOrder; +import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertRowResultsIgnoreOrder; +import static org.apache.fluss.testutils.DataTestUtils.row; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test union read primary key table with Hudi. */ +class FlinkUnionReadPrimaryKeyTableITCase extends FlinkUnionReadTestBase { + + @TempDir public static File savepointDir; + + @BeforeAll + protected static void beforeAll() { + FlinkUnionReadTestBase.beforeAll(); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void testUnionReadFullType(boolean isPartitioned) throws Exception { + JobClient jobClient = buildCheckpointedTieringJob(); + + String tableName = "pk_table_full_" + (isPartitioned ? "partitioned" : "non_partitioned"); + TablePath tablePath = TablePath.of(DEFAULT_DB, tableName); + Map bucketLogEndOffset = new HashMap<>(); + long tableId = + prepareFullTypePkTable( + tablePath, DEFAULT_BUCKET_NUM, isPartitioned, bucketLogEndOffset); + waitUntilBucketSynced(tablePath, tableId, DEFAULT_BUCKET_NUM, isPartitioned); + assertReplicaStatus(bucketLogEndOffset); + + List partitions = getPartitions(tablePath, isPartitioned); + List expectedRows = createExpectedInitialFullTypeRows(partitions); + assertThat(normalizeUpsertKind(toSortedRows("select * from " + tableName))) + .isEqualTo(toSortedStrings(expectedRows)); + + String queryFilter = "c4 = 30"; + String partitionName = isPartitioned ? partitions.get(0) : null; + if (partitionName != null) { + queryFilter = queryFilter + " and c16 = '" + partitionName + "'"; + } + List expectedPointQueryRows = + expectedRows.stream() + .filter( + row -> { + boolean matches = row.getField(3).equals(30); + if (partitionName != null) { + matches = matches && row.getField(15).equals(partitionName); + } + return matches; + }) + .map(Row::toString) + .sorted() + .collect(Collectors.toList()); + + assertThat(toSortedRows("select * from " + tableName + " where " + queryFilter)) + .isEqualTo(expectedPointQueryRows); + + jobClient.cancel().get(); + + for (String partition : partitions) { + writeFullTypeUpdateRow(tablePath, partition); + } + + expectedRows = createExpectedUpdatedFullTypeRows(partitions); + assertThat(normalizeUpsertKind(toSortedRows("select * from " + tableName))) + .isEqualTo(toSortedStrings(expectedRows)); + + List expectedProjectRows = + expectedRows.stream() + .map(row -> Row.of(row.getField(2), row.getField(3))) + .collect(Collectors.toList()); + assertThat(toSortedRows("select c3, c4 from " + tableName)) + .isEqualTo(toSortedStrings(expectedProjectRows)); + + List expectedProjectRows2 = + expectedRows.stream() + .map(row -> Row.of(row.getField(2))) + .collect(Collectors.toList()); + assertThat(toSortedRows("select c3 from " + tableName)) + .isEqualTo(toSortedStrings(expectedProjectRows2)); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void testUnionReadInStreamMode(boolean isPartitioned) throws Exception { + JobClient jobClient = buildCheckpointedTieringJob(); + + String tableName = + "stream_pk_table_full_" + (isPartitioned ? "partitioned" : "non_partitioned"); + TablePath tablePath = TablePath.of(DEFAULT_DB, tableName); + Map bucketLogEndOffset = new HashMap<>(); + long tableId = + prepareFullTypePkTable( + tablePath, DEFAULT_BUCKET_NUM, isPartitioned, bucketLogEndOffset); + waitUntilBucketSynced(tablePath, tableId, DEFAULT_BUCKET_NUM, isPartitioned); + assertReplicaStatus(bucketLogEndOffset); + + List partitions = getPartitions(tablePath, isPartitioned); + List expectedRows = createExpectedInitialFullTypeRows(partitions); + + String query = "select * from " + tableName; + CloseableIterator actual = streamTEnv.executeSql(query).collect(); + assertRowResultsIgnoreOrder(actual, expectedRows, false); + + jobClient.cancel().get(); + + for (String partition : partitions) { + writeFullTypeUpdateRow(tablePath, partition); + } + + List expectedUpdateRows = createExpectedFullTypeUpdateChangelogRows(partitions); + if (isPartitioned) { + assertRowResultsIgnoreOrder(actual, expectedUpdateRows, true); + } else { + assertResultsExactOrder(actual, expectedUpdateRows, true); + } + + actual = streamTEnv.executeSql(query).collect(); + List totalExpectedRows = new ArrayList<>(expectedRows); + totalExpectedRows.addAll(expectedUpdateRows); + if (isPartitioned) { + assertRowResultsIgnoreOrder(actual, totalExpectedRows, true); + } else { + assertResultsExactOrder(actual, totalExpectedRows, true); + } + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void testUnionReadPrimaryKeyTableFailover(boolean isPartitioned) throws Exception { + JobClient jobClient = buildCheckpointedTieringJob(); + + String tableName = + "restore_pk_table_" + (isPartitioned ? "partitioned" : "non_partitioned"); + String resultTableName = + "result_pk_table_" + (isPartitioned ? "partitioned" : "non_partitioned"); + + TablePath tablePath = TablePath.of(DEFAULT_DB, tableName); + TablePath resultTablePath = TablePath.of(DEFAULT_DB, resultTableName); + Map bucketLogEndOffset = new HashMap<>(); + Function> rowGenerator = + partition -> + Arrays.asList( + row(3, "string", partition), row(30, "another_string", partition)); + long tableId = + prepareSimplePkTable( + tablePath, + DEFAULT_BUCKET_NUM, + isPartitioned, + rowGenerator, + bucketLogEndOffset); + waitUntilBucketSynced(tablePath, tableId, DEFAULT_BUCKET_NUM, isPartitioned); + assertReplicaStatus(bucketLogEndOffset); + + createSimplePkTable(resultTablePath, DEFAULT_BUCKET_NUM, isPartitioned, false); + StreamTableEnvironment streamTEnv = buildStreamTEnv(null); + TableResult insertResult = + streamTEnv.executeSql( + "insert into " + resultTableName + " select * from " + tableName); + + List partitions = getPartitions(tablePath, isPartitioned); + List expectedRows = createExpectedSimpleRows(partitions, false); + CloseableIterator actual = + streamTEnv.executeSql("select * from " + resultTableName).collect(); + if (isPartitioned) { + assertRowResultsIgnoreOrder(actual, expectedRows, false); + } else { + assertResultsExactOrder(actual, expectedRows, false); + } + + String savepointPath = + insertResult + .getJobClient() + .get() + .stopWithSavepoint( + false, + savepointDir.getAbsolutePath(), + SavepointFormatType.CANONICAL) + .get(); + + streamTEnv = buildStreamTEnv(savepointPath); + insertResult = + streamTEnv.executeSql( + "insert into " + resultTableName + " select * from " + tableName); + + for (String partition : partitions) { + writeRows( + tablePath, + Collections.singletonList(row(30, "another_string_2", partition)), + false); + } + + List expectedUpdateRows = createExpectedSimpleRows(partitions, true); + if (isPartitioned) { + assertRowResultsIgnoreOrder(actual, expectedUpdateRows, true); + } else { + assertResultsExactOrder(actual, expectedUpdateRows, true); + } + + insertResult.getJobClient().get().cancel().get(); + jobClient.cancel().get(); + } + + private JobClient buildCheckpointedTieringJob() throws Exception { + execEnv.enableCheckpointing(1000); + return buildTieringJob(execEnv); + } + + private long prepareFullTypePkTable( + TablePath tablePath, + int bucketNum, + boolean isPartitioned, + Map bucketLogEndOffset) + throws Exception { + long tableId = createFullTypePkTable(tablePath, bucketNum, isPartitioned); + if (isPartitioned) { + Map partitionNameById = waitUntilPartitions(tablePath); + for (String partition : partitionNameById.values()) { + writeRows(tablePath, createInternalFullTypeRows(partition), false); + } + for (Long partitionId : partitionNameById.keySet()) { + bucketLogEndOffset.putAll(getBucketLogEndOffset(tableId, bucketNum, partitionId)); + } + } else { + writeRows(tablePath, createInternalFullTypeRows(null), false); + bucketLogEndOffset.putAll(getBucketLogEndOffset(tableId, bucketNum, null)); + } + return tableId; + } + + private long prepareSimplePkTable( + TablePath tablePath, + int bucketNum, + boolean isPartitioned, + Function> rowGenerator, + Map bucketLogEndOffset) + throws Exception { + long tableId = createSimplePkTable(tablePath, bucketNum, isPartitioned, true); + if (isPartitioned) { + Map partitionNameById = waitUntilPartitions(tablePath); + for (String partition : partitionNameById.values()) { + writeRows(tablePath, rowGenerator.apply(partition), false); + } + for (Long partitionId : partitionNameById.keySet()) { + bucketLogEndOffset.putAll(getBucketLogEndOffset(tableId, bucketNum, partitionId)); + } + } else { + writeRows(tablePath, rowGenerator.apply(null), false); + bucketLogEndOffset.putAll(getBucketLogEndOffset(tableId, bucketNum, null)); + } + return tableId; + } + + private Map getBucketLogEndOffset( + long tableId, int bucketNum, @Nullable Long partitionId) { + Map bucketLogEndOffsets = new HashMap<>(); + for (int i = 0; i < bucketNum; i++) { + TableBucket tableBucket = new TableBucket(tableId, partitionId, i); + Replica replica = getLeaderReplica(tableBucket); + bucketLogEndOffsets.put(tableBucket, replica.getLocalLogEndOffset()); + } + return bucketLogEndOffsets; + } + + private long createFullTypePkTable(TablePath tablePath, int bucketNum, boolean isPartitioned) + throws Exception { + Schema.Builder schemaBuilder = + Schema.newBuilder() + .column("c1", DataTypes.BOOLEAN()) + .column("c2", DataTypes.INT()) + .column("c3", DataTypes.INT()) + .column("c4", DataTypes.INT()) + .column("c5", DataTypes.BIGINT()) + .column("c6", DataTypes.FLOAT()) + .column("c7", DataTypes.DOUBLE()) + .column("c8", DataTypes.STRING()) + .column("c9", DataTypes.DECIMAL(5, 2)) + .column("c10", DataTypes.DECIMAL(20, 0)) + .column("c11", DataTypes.TIMESTAMP_LTZ(6)) + .column("c12", DataTypes.TIMESTAMP_LTZ(6)) + .column("c13", DataTypes.TIMESTAMP(3)) + .column("c14", DataTypes.TIMESTAMP(6)) + .column("c15", DataTypes.BINARY(4)) + .column("c16", DataTypes.STRING()); + return createPkTable(tablePath, bucketNum, isPartitioned, true, schemaBuilder, "c4", "c16"); + } + + private long createSimplePkTable( + TablePath tablePath, int bucketNum, boolean isPartitioned, boolean lakeEnabled) + throws Exception { + Schema.Builder schemaBuilder = + Schema.newBuilder() + .column("c1", DataTypes.INT()) + .column("c2", DataTypes.STRING()) + .column("c3", DataTypes.STRING()); + return createPkTable( + tablePath, bucketNum, isPartitioned, lakeEnabled, schemaBuilder, "c1", "c3"); + } + + private long createPkTable( + TablePath tablePath, + int bucketNum, + boolean isPartitioned, + boolean lakeEnabled, + Schema.Builder schemaBuilder, + String primaryKey, + String partitionKey) + throws Exception { + TableDescriptor.Builder tableBuilder = TableDescriptor.builder().distributedBy(bucketNum); + if (lakeEnabled) { + tableBuilder + .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true") + .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, Duration.ofMillis(500)) + .customProperty(HUDI_CONF_PREFIX + "precombine.field", primaryKey); + } + + if (isPartitioned) { + tableBuilder.property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED, true); + tableBuilder.partitionedBy(partitionKey); + tableBuilder.property( + ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT, AutoPartitionTimeUnit.YEAR); + schemaBuilder.primaryKey(primaryKey, partitionKey); + } else { + schemaBuilder.primaryKey(primaryKey); + } + tableBuilder.schema(schemaBuilder.build()); + return createTable(tablePath, tableBuilder.build()); + } + + private void writeFullTypeUpdateRow(TablePath tablePath, @Nullable String partition) + throws Exception { + writeRows( + tablePath, + Collections.singletonList( + row( + true, + 100, + 200, + 30, + 400L, + 500.1f, + 600.0d, + "another_string_2", + Decimal.fromUnscaledLong(900, 5, 2), + Decimal.fromBigDecimal(new BigDecimal(1000), 20, 0), + TimestampLtz.fromEpochMillis(1698235273400L), + TimestampLtz.fromEpochMillis(1698235273400L, 7000), + TimestampNtz.fromMillis(1698235273501L), + TimestampNtz.fromMillis(1698235273501L, 8000), + new byte[] {5, 6, 7, 8}, + partition)), + false); + } + + private static List createInternalFullTypeRows(@Nullable String partition) { + return Arrays.asList( + row( + false, + 1, + 2, + 3, + 4L, + 5.1f, + 6.0d, + "string", + Decimal.fromUnscaledLong(9, 5, 2), + Decimal.fromBigDecimal(new BigDecimal(10), 20, 0), + TimestampLtz.fromEpochMillis(1698235273182L), + TimestampLtz.fromEpochMillis(1698235273182L, 5000), + TimestampNtz.fromMillis(1698235273183L), + TimestampNtz.fromMillis(1698235273183L, 6000), + new byte[] {1, 2, 3, 4}, + partition), + row( + true, + 10, + 20, + 30, + 40L, + 50.1f, + 60.0d, + "another_string", + Decimal.fromUnscaledLong(90, 5, 2), + Decimal.fromBigDecimal(new BigDecimal(100), 20, 0), + TimestampLtz.fromEpochMillis(1698235273200L), + TimestampLtz.fromEpochMillis(1698235273200L, 5000), + TimestampNtz.fromMillis(1698235273201L), + TimestampNtz.fromMillis(1698235273201L, 6000), + new byte[] {1, 2, 3, 4}, + partition)); + } + + private List createExpectedInitialFullTypeRows(List partitions) { + List rows = new ArrayList<>(); + for (String partition : partitions) { + rows.add(createInitialFullTypeRow(partition)); + rows.add(createInitialFullTypePk30Row(RowKind.INSERT, partition)); + } + return rows; + } + + private List createExpectedUpdatedFullTypeRows(List partitions) { + List rows = new ArrayList<>(); + for (String partition : partitions) { + rows.add(createInitialFullTypeRow(partition)); + rows.add(createUpdatedFullTypePk30Row(RowKind.INSERT, partition)); + } + return rows; + } + + private List createExpectedFullTypeUpdateChangelogRows(List partitions) { + List rows = new ArrayList<>(); + for (String partition : partitions) { + rows.add(createInitialFullTypePk30Row(RowKind.UPDATE_BEFORE, partition)); + rows.add(createUpdatedFullTypePk30Row(RowKind.UPDATE_AFTER, partition)); + } + return rows; + } + + private Row createInitialFullTypeRow(@Nullable String partition) { + return Row.of( + false, + 1, + 2, + 3, + 4L, + 5.1f, + 6.0d, + "string", + new BigDecimal("0.09"), + new BigDecimal("10"), + Instant.ofEpochMilli(1698235273182L), + Instant.ofEpochMilli(1698235273182L).plusNanos(5000), + LocalDateTime.ofInstant(Instant.ofEpochMilli(1698235273183L), ZoneId.of("UTC")), + LocalDateTime.ofInstant(Instant.ofEpochMilli(1698235273183L), ZoneId.of("UTC")) + .plusNanos(6000), + new byte[] {1, 2, 3, 4}, + partition); + } + + private Row createInitialFullTypePk30Row(RowKind rowKind, @Nullable String partition) { + Row row = createInitialFullTypePk30Row(partition); + row.setKind(rowKind); + return row; + } + + private Row createInitialFullTypePk30Row(@Nullable String partition) { + return Row.of( + true, + 10, + 20, + 30, + 40L, + 50.1f, + 60.0d, + "another_string", + new BigDecimal("0.90"), + new BigDecimal("100"), + Instant.ofEpochMilli(1698235273200L), + Instant.ofEpochMilli(1698235273200L).plusNanos(5000), + LocalDateTime.ofInstant(Instant.ofEpochMilli(1698235273201L), ZoneId.of("UTC")), + LocalDateTime.ofInstant(Instant.ofEpochMilli(1698235273201L), ZoneId.of("UTC")) + .plusNanos(6000), + new byte[] {1, 2, 3, 4}, + partition); + } + + private Row createUpdatedFullTypePk30Row(RowKind rowKind, @Nullable String partition) { + Row row = createUpdatedFullTypePk30Row(partition); + row.setKind(rowKind); + return row; + } + + private Row createUpdatedFullTypePk30Row(@Nullable String partition) { + return Row.of( + true, + 100, + 200, + 30, + 400L, + 500.1f, + 600.0d, + "another_string_2", + new BigDecimal("9.00"), + new BigDecimal("1000"), + Instant.ofEpochMilli(1698235273400L), + Instant.ofEpochMilli(1698235273400L).plusNanos(7000), + LocalDateTime.ofInstant(Instant.ofEpochMilli(1698235273501L), ZoneId.of("UTC")), + LocalDateTime.ofInstant(Instant.ofEpochMilli(1698235273501L), ZoneId.of("UTC")) + .plusNanos(8000), + new byte[] {5, 6, 7, 8}, + partition); + } + + private List createExpectedSimpleRows(List partitions, boolean updateOnly) { + List rows = new ArrayList<>(); + for (String partition : partitions) { + if (updateOnly) { + rows.add(Row.ofKind(RowKind.UPDATE_BEFORE, 30, "another_string", partition)); + rows.add(Row.ofKind(RowKind.UPDATE_AFTER, 30, "another_string_2", partition)); + } else { + rows.add(Row.of(3, "string", partition)); + rows.add(Row.of(30, "another_string", partition)); + } + } + return rows; + } + + private List getPartitions(TablePath tablePath, boolean isPartitioned) { + if (!isPartitioned) { + return Collections.singletonList(null); + } + Map partitionNameById = waitUntilPartitions(tablePath); + return partitionNameById.values().stream().sorted().collect(Collectors.toList()); + } + + private List toSortedRows(String sql) { + return CollectionUtil.iteratorToList(batchTEnv.executeSql(sql).collect()).stream() + .map(Row::toString) + .sorted() + .collect(Collectors.toList()); + } + + private List toSortedStrings(List rows) { + return rows.stream() + .sorted(Comparator.comparing(Row::toString)) + .map(Row::toString) + .collect(Collectors.toList()); + } + + private List normalizeUpsertKind(List rows) { + return rows.stream().map(row -> row.replace("+U", "+I")).collect(Collectors.toList()); + } +} diff --git a/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/source/HudiLakeSourceTest.java b/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/source/HudiLakeSourceTest.java index 5d8bbdf298..d4ccfde268 100644 --- a/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/source/HudiLakeSourceTest.java +++ b/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/source/HudiLakeSourceTest.java @@ -18,11 +18,22 @@ package org.apache.fluss.lake.hudi.source; import org.apache.fluss.config.Configuration; +import org.apache.fluss.lake.hudi.HudiLakeCatalog; +import org.apache.fluss.lake.lakestorage.TestingLakeCatalogContext; import org.apache.fluss.lake.source.LakeSource; +import org.apache.fluss.lake.source.RecordReader; +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.types.DataTypes; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import javax.annotation.Nullable; + +import java.io.File; +import java.io.IOException; import java.util.Collections; import static org.assertj.core.api.Assertions.assertThat; @@ -31,6 +42,8 @@ /** Test for {@link HudiLakeSource}. */ class HudiLakeSourceTest { + @TempDir private File tempWarehouseDir; + @Test void testCreatePlannerReturnsHudiSplitPlanner() throws Exception { HudiLakeSource source = @@ -67,4 +80,88 @@ void testWithLimitIsExplicitlyUnsupported() { .isInstanceOf(UnsupportedOperationException.class) .hasMessageContaining("limit"); } + + @Test + void testReaderContextDoesNotRequireSortedRecordsByDefault() { + LakeSource.ReaderContext context = () -> null; + + assertThat(context.requireSortedRecords()).isFalse(); + } + + @Test + void testCreateRecordReaderUsesSortedReaderWhenRequired() throws Exception { + Configuration hudiConfig = hudiConfig(); + TablePath tablePath = TablePath.of("db1", "sorted_required_pk_table"); + createPkTable(hudiConfig, tablePath); + + HudiLakeSource source = new HudiLakeSource(hudiConfig, tablePath); + + RecordReader recordReader = source.createRecordReader(sortedReaderContext(null)); + + assertThat(recordReader).isInstanceOf(HudiSortedRecordReader.class); + } + + @Test + void testCreateRecordReaderFailsWhenSortedReaderCanNotKeepRecordKeyOrder() throws Exception { + Configuration hudiConfig = hudiConfig(); + TablePath tablePath = TablePath.of("db1", "sorted_required_projected_pk_table"); + createPkTable(hudiConfig, tablePath); + + HudiLakeSource source = new HudiLakeSource(hudiConfig, tablePath); + source.withProject(project(1)); + + assertThatThrownBy(() -> source.createRecordReader(sortedReaderContext(null))) + .isInstanceOf(IOException.class) + .hasMessageContaining("Fail to create Hudi record reader") + .hasCauseInstanceOf(UnsupportedOperationException.class); + } + + private Configuration hudiConfig() { + Configuration hudiConfig = new Configuration(); + hudiConfig.setString("catalog.path", tempWarehouseDir.toURI().toString()); + hudiConfig.setString("mode", "dfs"); + return hudiConfig; + } + + private static LakeSource.ReaderContext sortedReaderContext( + @Nullable HudiSplit hudiSplit) { + return new LakeSource.ReaderContext() { + @Nullable + @Override + public HudiSplit lakeSplit() { + return hudiSplit; + } + + @Override + public boolean requireSortedRecords() { + return true; + } + }; + } + + private static void createPkTable(Configuration hudiConfig, TablePath tablePath) + throws Exception { + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema( + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .primaryKey("id") + .build()) + .distributedBy(4) + .property("hudi.precombine.field", "id") + .build(); + try (HudiLakeCatalog catalog = new HudiLakeCatalog(hudiConfig)) { + catalog.createTable(tablePath, tableDescriptor, new TestingLakeCatalogContext()); + } + } + + private static int[][] project(int... fields) { + int[][] project = new int[fields.length][]; + for (int i = 0; i < fields.length; i++) { + project[i] = new int[] {fields[i]}; + } + return project; + } } diff --git a/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/source/HudiSortedRecordReaderTest.java b/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/source/HudiSortedRecordReaderTest.java new file mode 100644 index 0000000000..7e814f4755 --- /dev/null +++ b/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/source/HudiSortedRecordReaderTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.hudi.source; + +import org.apache.fluss.config.Configuration; +import org.apache.fluss.lake.hudi.HudiLakeCatalog; +import org.apache.fluss.lake.hudi.utils.HudiTableInfo; +import org.apache.fluss.lake.lakestorage.TestingLakeCatalogContext; +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.types.DataTypes; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link HudiSortedRecordReader}. */ +class HudiSortedRecordReaderTest { + + @TempDir private File tempWarehouseDir; + + private Configuration hudiConfig; + + @BeforeEach + void setUp() { + hudiConfig = new Configuration(); + hudiConfig.setString("catalog.path", tempWarehouseDir.toURI().toString()); + hudiConfig.setString("mode", "dfs"); + } + + @Test + void testCanSortByRecordKeyRequiresProjectedKey() throws Exception { + TablePath tablePath = TablePath.of("db1", "single_pk_table"); + createTable( + tablePath, + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .primaryKey("id") + .build(), + TableDescriptor.builder().distributedBy(4)); + + try (HudiTableInfo hudiTableInfo = HudiTableInfo.create(tablePath, hudiConfig)) { + assertThat(HudiSortedRecordReader.canSortByRecordKey(hudiTableInfo, null)).isTrue(); + assertThat(HudiSortedRecordReader.canSortByRecordKey(hudiTableInfo, project(0))) + .isTrue(); + assertThat(HudiSortedRecordReader.canSortByRecordKey(hudiTableInfo, project(1))) + .isFalse(); + assertThat(HudiSortedRecordReader.canSortByRecordKey(hudiTableInfo, project(1, 0))) + .isTrue(); + } + } + + @Test + void testCanSortByRecordKeyRequiresAllProjectedKeys() throws Exception { + TablePath tablePath = TablePath.of("db1", "composite_pk_table"); + createTable( + tablePath, + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("dt", DataTypes.STRING()) + .column("name", DataTypes.STRING()) + .primaryKey("id", "dt") + .build(), + TableDescriptor.builder().distributedBy(4).partitionedBy("dt")); + + try (HudiTableInfo hudiTableInfo = HudiTableInfo.create(tablePath, hudiConfig)) { + assertThat(HudiSortedRecordReader.canSortByRecordKey(hudiTableInfo, project(0, 1))) + .isTrue(); + assertThat(HudiSortedRecordReader.canSortByRecordKey(hudiTableInfo, project(0))) + .isFalse(); + assertThat(HudiSortedRecordReader.canSortByRecordKey(hudiTableInfo, project(1))) + .isFalse(); + } + } + + private void createTable( + TablePath tablePath, Schema schema, TableDescriptor.Builder tableBuilder) + throws Exception { + TableDescriptor tableDescriptor = + tableBuilder.schema(schema).property("hudi.precombine.field", "id").build(); + try (HudiLakeCatalog catalog = new HudiLakeCatalog(hudiConfig)) { + catalog.createTable(tablePath, tableDescriptor, new TestingLakeCatalogContext()); + } + } + + private static int[][] project(int... fields) { + int[][] project = new int[fields.length][]; + for (int i = 0; i < fields.length; i++) { + project[i] = new int[] {fields[i]}; + } + return project; + } +}