Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,10 @@ private void initializeLakeRecordIterators() throws IOException {
List<RecordReader> recordReaders = new ArrayList<>();
if (lakeSplits == null || lakeSplits.isEmpty()) {
// pass null split to get rowComparator
recordReaders.add(lakeSource.createRecordReader(() -> null));
recordReaders.add(lakeSource.createRecordReader(sortedReaderContext(null)));
} else {
for (LakeSplit lakeSplit : lakeSplits) {
recordReaders.add(lakeSource.createRecordReader(() -> lakeSplit));
recordReaders.add(lakeSource.createRecordReader(sortedReaderContext(lakeSplit)));
}
}
for (RecordReader reader : recordReaders) {
Expand All @@ -206,6 +206,21 @@ private void initializeLakeRecordIterators() throws IOException {
lakeRecordIteratorsInitialized = true;
}

private LakeSource.ReaderContext<LakeSplit> sortedReaderContext(@Nullable LakeSplit lakeSplit) {
return new LakeSource.ReaderContext<LakeSplit>() {
@Nullable
@Override
public LakeSplit lakeSplit() {
return lakeSplit;
}

@Override
public boolean requireSortedRecords() {
return true;
}
};
}

private void pollLogRecords(Duration timeout) {
ScanRecords scanRecords = logScanner.poll(timeout);
for (ScanRecord scanRecord : scanRecords) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.fluss.lake.serializer.SimpleVersionedSerializer;
import org.apache.fluss.predicate.Predicate;

import javax.annotation.Nullable;

import java.io.IOException;
import java.io.Serializable;
import java.util.List;
Expand Down Expand Up @@ -91,12 +93,28 @@ interface PlannerContext extends Serializable {
}

/**
* Context interface for record readers, providing access to the lake split being read.
* Context interface for record readers, providing access to the lake split being read and the
* required reading semantics.
*
* @param <Split> The type of lake split
*/
interface ReaderContext<Split extends LakeSplit> extends Serializable {
/**
* Returns the lake split to read.
*
* <p>The split can be null when the caller only needs reader-level metadata, such as a sort
* comparator.
*/
@Nullable
Split lakeSplit();

/**
* Returns whether records produced by this reader must follow the order defined by {@link
* SortedRecordReader#order()}.
*/
default boolean requireSortedRecords() {
return false;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.predicate.Predicate;

import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.org.apache.avro.Schema;
import org.apache.hudi.source.ExpressionPredicates;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused Import: The constructor receives a predicates parameter but it is not used.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already used: predicates is passed into HudiRecordReader when creating the delegate reader, so I kept it unchanged.

import org.apache.hudi.util.StreamerUtil;
Expand Down Expand Up @@ -102,6 +103,17 @@ public Planner<HudiSplit> createPlanner(PlannerContext context) throws IOExcepti
@Override
public RecordReader createRecordReader(ReaderContext<HudiSplit> context) throws IOException {
try {
if (context.requireSortedRecords()) {
if (!shouldUseSortedRecordReader()) {
throw new UnsupportedOperationException(
"Hudi lake source can not provide sorted records for "
+ tablePath
+ ". Sorted records are only supported for MERGE_ON_READ tables"
+ " whose projection contains all Hudi record key fields.");
}
return new HudiSortedRecordReader(
hudiConfig, tablePath, context.lakeSplit(), project, predicates);
}
return new HudiRecordReader(
hudiConfig, tablePath, context.lakeSplit(), project, predicates);
} catch (Exception e) {
Expand All @@ -121,4 +133,11 @@ private Schema getHudiSchema() {
throw new RuntimeException("Fail to get Hudi schema for " + tablePath + ".", e);
}
}

private boolean shouldUseSortedRecordReader() throws IOException {
try (HudiTableInfo hudiTableInfo = HudiTableInfo.create(tablePath, hudiConfig)) {
return hudiTableInfo.getTableType() == HoodieTableType.MERGE_ON_READ
&& HudiSortedRecordReader.canSortByRecordKey(hudiTableInfo, project);
}
}
}
Loading
Loading