diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index aa051cd14..33889ee88 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -188,6 +188,7 @@ set(ICEBERG_DATA_SOURCES data/data_writer.cc data/delete_filter.cc data/delete_loader.cc + data/deletion_vector_writer.cc data/equality_delete_writer.cc data/file_scan_task_reader.cc data/position_delete_writer.cc diff --git a/src/iceberg/data/delete_loader.cc b/src/iceberg/data/delete_loader.cc index 922173401..4167fa1fe 100644 --- a/src/iceberg/data/delete_loader.cc +++ b/src/iceberg/data/delete_loader.cc @@ -19,9 +19,12 @@ #include "iceberg/data/delete_loader.h" +#include #include +#include #include #include +#include #include #include @@ -30,6 +33,7 @@ #include "iceberg/arrow_c_data_guard_internal.h" #include "iceberg/deletes/position_delete_index.h" #include "iceberg/deletes/position_delete_range_consumer.h" +#include "iceberg/file_io.h" #include "iceberg/file_reader.h" #include "iceberg/manifest/manifest_entry.h" #include "iceberg/metadata_columns.h" @@ -171,7 +175,44 @@ Status DeleteLoader::LoadPositionDelete(const DataFile& file, PositionDeleteInde } Status DeleteLoader::LoadDV(const DataFile& file, PositionDeleteIndex& index) const { - return NotSupported("Loading deletion vectors is not yet supported"); + // A deletion vector must reference exactly one data file; without it the + // caller cannot know which data file the positions apply to. + ICEBERG_PRECHECK(file.referenced_data_file.has_value(), + "Deletion vector requires referenced_data_file: {}", file.file_path); + + // For deletion vectors, content_offset and content_size_in_bytes point directly + // at the DV blob bytes within the Puffin file and are required by the spec. + ICEBERG_PRECHECK( + file.content_offset.has_value() && file.content_size_in_bytes.has_value(), + "Deletion vector requires content_offset and content_size_in_bytes: {}", + file.file_path); + + const int64_t offset = file.content_offset.value(); + const int64_t length = file.content_size_in_bytes.value(); + ICEBERG_PRECHECK(offset >= 0 && length >= 0, + "Invalid deletion vector offset/length: offset={}, length={}", offset, + length); + ICEBERG_PRECHECK(length <= std::numeric_limits::max(), + "Cannot read deletion vector larger than 2GB: {}", length); + + ICEBERG_ASSIGN_OR_RAISE(auto input_file, io_->NewInputFile(file.file_path)); + ICEBERG_ASSIGN_OR_RAISE(auto stream, input_file->Open()); + + std::vector bytes(static_cast(length)); + ICEBERG_RETURN_UNEXPECTED(stream->ReadFully(offset, bytes)); + ICEBERG_RETURN_UNEXPECTED(stream->Close()); + + std::span blob(reinterpret_cast(bytes.data()), + bytes.size()); + ICEBERG_ASSIGN_OR_RAISE(auto dv, PositionDeleteIndex::Deserialize(blob)); + + // The bitmap cardinality must match the record count recorded in metadata. + ICEBERG_PRECHECK(std::cmp_equal(dv.Cardinality(), file.record_count), + "Deletion vector cardinality {} does not match record count {}: {}", + dv.Cardinality(), file.record_count, file.file_path); + + index.Merge(dv); + return {}; } Result DeleteLoader::LoadPositionDeletes( diff --git a/src/iceberg/data/deletion_vector_writer.cc b/src/iceberg/data/deletion_vector_writer.cc new file mode 100644 index 000000000..40df88b8e --- /dev/null +++ b/src/iceberg/data/deletion_vector_writer.cc @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/data/deletion_vector_writer.h" + +#include +#include +#include +#include +#include +#include + +#include "iceberg/file_io.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/metadata_columns.h" +#include "iceberg/partition_spec.h" +#include "iceberg/puffin/file_metadata.h" +#include "iceberg/puffin/puffin_writer.h" +#include "iceberg/util/content_file_util.h" +#include "iceberg/util/macros.h" +#include "iceberg/version.h" + +namespace iceberg { + +namespace { +constexpr std::string_view kReferencedDataFile = "referenced-data-file"; +constexpr std::string_view kCardinality = "cardinality"; +} // namespace + +class DeletionVectorWriter::Impl { + public: + explicit Impl(DeletionVectorWriterOptions options) : options_(std::move(options)) {} + + // Accumulated positions and metadata for a single referenced data file. + struct Deletes { + PositionDeleteIndex positions; + std::shared_ptr spec; + PartitionValues partition; + }; + + Deletes& DeletesFor(std::string_view referenced_data_file, + std::shared_ptr spec, PartitionValues partition) { + auto [it, inserted] = deletes_by_path_.try_emplace(std::string(referenced_data_file)); + if (inserted) { + it->second.spec = std::move(spec); + it->second.partition = std::move(partition); + } + return it->second; + } + + Status Delete(std::string_view referenced_data_file, int64_t pos, + std::shared_ptr spec, PartitionValues partition) { + ICEBERG_CHECK(!closed_, "Cannot delete after the writer is closed"); + ICEBERG_PRECHECK(!referenced_data_file.empty(), + "Deletion vector requires a non-empty referenced data file"); + DeletesFor(referenced_data_file, std::move(spec), std::move(partition)) + .positions.Delete(pos); + return {}; + } + + Status Delete(std::string_view referenced_data_file, + const PositionDeleteIndex& positions, std::shared_ptr spec, + PartitionValues partition) { + ICEBERG_CHECK(!closed_, "Cannot delete after the writer is closed"); + ICEBERG_PRECHECK(!referenced_data_file.empty(), + "Deletion vector requires a non-empty referenced data file"); + DeletesFor(referenced_data_file, std::move(spec), std::move(partition)) + .positions.Merge(positions); + return {}; + } + + Status Close() { + if (closed_) { + return {}; + } + + // No deletes: skip creating an orphan Puffin file that no metadata + // references. + if (deletes_by_path_.empty()) { + closed_ = true; + return {}; + } + + // Merge previously written deletes and collect the file-scoped delete files + // they came from so callers can remove them from table state. + if (options_.load_previous_deletes) { + for (auto& [path, deletes] : deletes_by_path_) { + ICEBERG_ASSIGN_OR_RAISE(auto previous, options_.load_previous_deletes(path)); + if (previous.index == nullptr) { + continue; + } + deletes.positions.Merge(*previous.index); + for (const auto& delete_file : previous.delete_files) { + ICEBERG_ASSIGN_OR_RAISE(bool file_scoped, + ContentFileUtil::IsFileScoped(*delete_file)); + if (file_scoped) { + result_.rewritten_delete_files.push_back(delete_file); + } + } + } + } + + auto properties = options_.properties; + if (const std::string created_by(puffin::StandardPuffinProperties::kCreatedBy); + !properties.contains(created_by)) { + properties.emplace(created_by, + std::format("iceberg-cpp/{}.{}.{}", ICEBERG_VERSION_MAJOR, + ICEBERG_VERSION_MINOR, ICEBERG_VERSION_PATCH)); + } + + ICEBERG_ASSIGN_OR_RAISE(auto output_file, options_.io->NewOutputFile(options_.path)); + ICEBERG_ASSIGN_OR_RAISE( + auto writer, + puffin::PuffinWriter::Make(std::move(output_file), std::move(properties))); + + struct Entry { + std::string referenced_data_file; + std::shared_ptr spec; + PartitionValues partition; + int64_t offset; + int64_t length; + int64_t cardinality; + }; + std::vector entries; + entries.reserve(deletes_by_path_.size()); + + for (auto& [path, deletes] : deletes_by_path_) { + const int64_t cardinality = deletes.positions.Cardinality(); + ICEBERG_ASSIGN_OR_RAISE(auto data, deletes.positions.Serialize()); + + puffin::Blob blob{ + .type = std::string(puffin::StandardBlobTypes::kDeletionVectorV1), + .input_fields = {MetadataColumns::kFilePositionColumnId}, + // Snapshot ID and sequence number are inherited; the spec requires -1. + .snapshot_id = -1, + .sequence_number = -1, + .data = std::move(data), + .requested_compression = puffin::PuffinCompressionCodec::kNone, + }; + blob.properties.emplace(std::string(kReferencedDataFile), path); + blob.properties.emplace(std::string(kCardinality), std::format("{}", cardinality)); + + ICEBERG_ASSIGN_OR_RAISE(auto blob_metadata, writer->Write(blob)); + entries.push_back(Entry{ + .referenced_data_file = path, + .spec = deletes.spec, + .partition = deletes.partition, + .offset = blob_metadata.offset, + .length = blob_metadata.length, + .cardinality = cardinality, + }); + } + + ICEBERG_RETURN_UNEXPECTED(writer->Finish()); + ICEBERG_ASSIGN_OR_RAISE(const int64_t file_size, writer->FileSize()); + + for (auto& entry : entries) { + result_.referenced_data_files.push_back(entry.referenced_data_file); + result_.delete_files.push_back(std::make_shared(DataFile{ + .content = DataFile::Content::kPositionDeletes, + .file_path = options_.path, + .file_format = FileFormatType::kPuffin, + .partition = std::move(entry.partition), + .record_count = entry.cardinality, + .file_size_in_bytes = file_size, + .referenced_data_file = std::move(entry.referenced_data_file), + .content_offset = entry.offset, + .content_size_in_bytes = entry.length, + .partition_spec_id = + entry.spec ? std::make_optional(entry.spec->spec_id()) : std::nullopt, + })); + } + + closed_ = true; + return {}; + } + + Result Metadata() { + ICEBERG_CHECK(closed_, "Cannot get metadata before closing the writer"); + return result_; + } + + private: + DeletionVectorWriterOptions options_; + // Ordered by referenced data file path for deterministic blob layout. + std::map deletes_by_path_; + DeleteWriteResult result_; + bool closed_ = false; +}; + +DeletionVectorWriter::DeletionVectorWriter(std::unique_ptr impl) + : impl_(std::move(impl)) {} + +DeletionVectorWriter::~DeletionVectorWriter() = default; + +Result> DeletionVectorWriter::Make( + DeletionVectorWriterOptions options) { + ICEBERG_PRECHECK(options.io != nullptr, "DeletionVectorWriter requires a FileIO"); + ICEBERG_PRECHECK(!options.path.empty(), "DeletionVectorWriter requires a path"); + return std::unique_ptr( + new DeletionVectorWriter(std::make_unique(std::move(options)))); +} + +Status DeletionVectorWriter::Delete(std::string_view referenced_data_file, int64_t pos, + std::shared_ptr spec, + PartitionValues partition) { + return impl_->Delete(referenced_data_file, pos, std::move(spec), std::move(partition)); +} + +Status DeletionVectorWriter::Delete(std::string_view referenced_data_file, + const PositionDeleteIndex& positions, + std::shared_ptr spec, + PartitionValues partition) { + return impl_->Delete(referenced_data_file, positions, std::move(spec), + std::move(partition)); +} + +Status DeletionVectorWriter::Close() { return impl_->Close(); } + +Result DeletionVectorWriter::Metadata() { return impl_->Metadata(); } + +} // namespace iceberg diff --git a/src/iceberg/data/deletion_vector_writer.h b/src/iceberg/data/deletion_vector_writer.h new file mode 100644 index 000000000..957a6a7f7 --- /dev/null +++ b/src/iceberg/data/deletion_vector_writer.h @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/data/deletion_vector_writer.h +/// Writer that emits deletion vectors as `deletion-vector-v1` blobs in a Puffin file. + +#include +#include +#include +#include +#include +#include +#include + +#include "iceberg/deletes/position_delete_index.h" +#include "iceberg/iceberg_data_export.h" +#include "iceberg/result.h" +#include "iceberg/row/partition_values.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +/// \brief The result of writing deletion vectors. +struct ICEBERG_DATA_EXPORT DeleteWriteResult { + /// Deletion vector files produced, one per referenced data file. + std::vector> delete_files; + /// Data files referenced by the produced deletion vectors. + std::vector referenced_data_files; + /// Previously written, file-scoped delete files that were merged and should be + /// removed from table state. + std::vector> rewritten_delete_files; +}; + +/// \brief Previously written deletes for a data file, used to merge before +/// writing a new deletion vector. +struct ICEBERG_DATA_EXPORT PreviousDeletes { + /// The previously deleted positions, or null if none. + std::shared_ptr index; + /// The delete files the positions came from; file-scoped ones are reported as + /// rewritten. + std::vector> delete_files; +}; + +/// \brief Options for creating a DeletionVectorWriter. +struct ICEBERG_DATA_EXPORT DeletionVectorWriterOptions { + /// Output Puffin file location. + std::string path; + /// FileIO used to create the Puffin file. + std::shared_ptr io; + /// File-level Puffin properties (e.g. "created-by"). + std::unordered_map properties; + /// Optional hook to load previously written deletes for a data file so they + /// can be merged into the new deletion vector, with the old delete files + /// reported as rewritten. + std::function(std::string_view)> load_previous_deletes; +}; + +/// \brief Writes one or more deletion vectors into a single Puffin file. +/// +/// Each referenced data file gets its own `deletion-vector-v1` blob with its own +/// partition spec and partition. After Close(), Metadata() returns the produced +/// delete files plus the referenced and rewritten delete files. +class ICEBERG_DATA_EXPORT DeletionVectorWriter { + public: + ~DeletionVectorWriter(); + + DeletionVectorWriter(const DeletionVectorWriter&) = delete; + DeletionVectorWriter& operator=(const DeletionVectorWriter&) = delete; + + /// \brief Create a new DeletionVectorWriter. + static Result> Make( + DeletionVectorWriterOptions options); + + /// \brief Mark a row position as deleted for the given data file. + Status Delete(std::string_view referenced_data_file, int64_t pos, + std::shared_ptr spec, PartitionValues partition); + + /// \brief Mark all positions in the given index as deleted for a data file. + Status Delete(std::string_view referenced_data_file, + const PositionDeleteIndex& positions, std::shared_ptr spec, + PartitionValues partition); + + /// \brief Write all accumulated deletion vectors to the Puffin file and close. + Status Close(); + + /// \brief The result of writing; valid only after Close(). + Result Metadata(); + + private: + class Impl; + std::unique_ptr impl_; + + explicit DeletionVectorWriter(std::unique_ptr impl); +}; + +} // namespace iceberg diff --git a/src/iceberg/data/meson.build b/src/iceberg/data/meson.build index bbb26db27..eaae8a2b4 100644 --- a/src/iceberg/data/meson.build +++ b/src/iceberg/data/meson.build @@ -20,6 +20,7 @@ install_headers( 'data_writer.h', 'delete_filter.h', 'delete_loader.h', + 'deletion_vector_writer.h', 'equality_delete_writer.h', 'file_scan_task_reader.h', 'position_delete_writer.h', diff --git a/src/iceberg/deletes/position_delete_index.cc b/src/iceberg/deletes/position_delete_index.cc index f09d0b480..fedabcb74 100644 --- a/src/iceberg/deletes/position_delete_index.cc +++ b/src/iceberg/deletes/position_delete_index.cc @@ -19,8 +19,54 @@ #include "iceberg/deletes/position_delete_index.h" +#include + +#include +#include +#include +#include +#include +#include + +#include "iceberg/util/endian.h" +#include "iceberg/util/macros.h" + namespace iceberg { +namespace { + +// `deletion-vector-v1` blob framing. See: +// https://iceberg.apache.org/puffin-spec/#deletion-vector-v1-blob-type +constexpr std::array kMagic = {0xD1, 0xD3, 0x39, 0x64}; +constexpr int32_t kLengthPrefixBytes = 4; +constexpr int32_t kMagicBytes = 4; +constexpr int32_t kCrcBytes = 4; + +uint32_t ComputeCrc32(std::span bytes) { + uLong crc = crc32(0L, Z_NULL, 0); + crc = crc32(crc, reinterpret_cast(bytes.data()), + static_cast(bytes.size())); + return static_cast(crc); +} + +template +void WriteBigEndian(T value, uint8_t* buf) { + T be = ToBigEndian(value); + std::memcpy(buf, &be, sizeof(be)); +} + +template +T ReadBigEndian(const uint8_t* buf) { + T value; + std::memcpy(&value, buf, sizeof(value)); + return FromBigEndian(value); +} + +} // namespace + +PositionDeleteIndex::PositionDeleteIndex(RoaringPositionBitmap bitmap) + : bitmap_(std::move(bitmap)) {} + void PositionDeleteIndex::Delete(int64_t pos) { bitmap_.Add(pos); } void PositionDeleteIndex::Delete(int64_t pos_start, int64_t pos_end) { @@ -39,6 +85,81 @@ void PositionDeleteIndex::Merge(const PositionDeleteIndex& other) { bitmap_.Or(other.bitmap_); } +Result> PositionDeleteIndex::Serialize() { + bitmap_.Optimize(); // run-length encode before serializing + ICEBERG_ASSIGN_OR_RAISE(auto vector, bitmap_.Serialize()); + + // The length prefix and CRC both cover the magic sequence plus the vector. + const size_t magic_and_vector_size = static_cast(kMagicBytes) + vector.size(); + ICEBERG_PRECHECK( + magic_and_vector_size <= static_cast(std::numeric_limits::max()), + "Deletion vector is too large to serialize: {} bytes", magic_and_vector_size); + + std::vector blob(static_cast(kLengthPrefixBytes) + + magic_and_vector_size + static_cast(kCrcBytes)); + uint8_t* buf = blob.data(); + + WriteBigEndian(static_cast(magic_and_vector_size), buf); + buf += kLengthPrefixBytes; + + uint8_t* checksum_begin = buf; + std::memcpy(buf, kMagic.data(), kMagicBytes); + buf += kMagicBytes; + std::memcpy(buf, vector.data(), vector.size()); + buf += vector.size(); + + WriteBigEndian( + ComputeCrc32(std::span(checksum_begin, magic_and_vector_size)), buf); + return blob; +} + +Result PositionDeleteIndex::Deserialize( + std::span blob) { + constexpr size_t kMinSize = static_cast(kLengthPrefixBytes) + + static_cast(kMagicBytes) + + static_cast(kCrcBytes); + ICEBERG_PRECHECK(blob.size() >= kMinSize, + "Deletion vector blob too small: {} bytes, need at least {}", + blob.size(), kMinSize); + + const uint8_t* buf = blob.data(); + + const auto length = ReadBigEndian(buf); + buf += kLengthPrefixBytes; + + ICEBERG_PRECHECK(length >= kMagicBytes, "Invalid deletion vector length prefix: {}", + length); + + const size_t expected_total = static_cast(kLengthPrefixBytes) + + static_cast(length) + + static_cast(kCrcBytes); + ICEBERG_PRECHECK(blob.size() == expected_total, + "Deletion vector blob size mismatch: {} bytes, expected {}", + blob.size(), expected_total); + + // Magic and vector are checksummed together by the trailing CRC. + const uint8_t* checksum_begin = buf; + for (size_t i = 0; i < kMagic.size(); ++i) { + ICEBERG_PRECHECK(buf[i] == kMagic[i], + "Invalid deletion vector magic byte at offset {}: got {:#04x}", i, + buf[i]); + } + buf += kMagicBytes; + + const auto stored_crc = + ReadBigEndian(checksum_begin + static_cast(length)); + const uint32_t actual_crc = + ComputeCrc32(std::span(checksum_begin, static_cast(length))); + ICEBERG_PRECHECK(stored_crc == actual_crc, + "Deletion vector CRC mismatch: stored {:#010x}, computed {:#010x}", + stored_crc, actual_crc); + + const auto vector_size = static_cast(length) - kMagicBytes; + std::string_view vector_bytes(reinterpret_cast(buf), vector_size); + ICEBERG_ASSIGN_OR_RAISE(auto bitmap, RoaringPositionBitmap::Deserialize(vector_bytes)); + return PositionDeleteIndex(std::move(bitmap)); +} + void PositionDeleteIndex::BulkAddForKey(int32_t key, std::span positions) { bitmap_.AddManyForKey(key, positions); diff --git a/src/iceberg/deletes/position_delete_index.h b/src/iceberg/deletes/position_delete_index.h index 592216ed6..04274f53a 100644 --- a/src/iceberg/deletes/position_delete_index.h +++ b/src/iceberg/deletes/position_delete_index.h @@ -29,6 +29,7 @@ #include "iceberg/deletes/roaring_position_bitmap.h" #include "iceberg/iceberg_data_export.h" +#include "iceberg/result.h" namespace iceberg { @@ -66,7 +67,20 @@ class ICEBERG_DATA_EXPORT PositionDeleteIndex { /// \param other The index to merge (union operation) void Merge(const PositionDeleteIndex& other); + /// \brief Serialize the index into a `deletion-vector-v1` blob. + /// + /// The positions are run-length encoded, then framed per the Puffin spec: + /// https://iceberg.apache.org/puffin-spec/#deletion-vector-v1-blob-type + Result> Serialize(); + + /// \brief Deserialize a `deletion-vector-v1` blob into an index. + /// + /// Validates the length prefix, magic sequence, and CRC-32 checksum. + static Result Deserialize(std::span blob); + private: + explicit PositionDeleteIndex(RoaringPositionBitmap bitmap); + // Bulk-add positions sharing high-32-bit `key`. Private hook for // `ForEachPositionDelete`'s bulk path; keeps `Delete` the sole public // mutation surface. diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index 58d79a402..a06bde20b 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -170,6 +170,7 @@ iceberg_data_sources = files( 'data/data_writer.cc', 'data/delete_filter.cc', 'data/delete_loader.cc', + 'data/deletion_vector_writer.cc', 'data/equality_delete_writer.cc', 'data/file_scan_task_reader.cc', 'data/position_delete_writer.cc', diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index 0756c1eef..805d7f59b 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -161,6 +161,9 @@ add_iceberg_test(puffin_test puffin_json_test.cc puffin_reader_writer_test.cc) +add_iceberg_test(deletion_vector_writer_test USE_DATA SOURCES + deletion_vector_writer_test.cc) + if(ICEBERG_BUILD_BUNDLE) add_iceberg_test(avro_test USE_BUNDLE diff --git a/src/iceberg/test/delete_filter_test.cc b/src/iceberg/test/delete_filter_test.cc index 89d1b6b85..4139fcc3c 100644 --- a/src/iceberg/test/delete_filter_test.cc +++ b/src/iceberg/test/delete_filter_test.cc @@ -34,6 +34,7 @@ #include #include "iceberg/arrow/arrow_io_internal.h" +#include "iceberg/data/deletion_vector_writer.h" #include "iceberg/data/equality_delete_writer.h" #include "iceberg/data/position_delete_writer.h" #include "iceberg/file_format.h" @@ -249,6 +250,23 @@ class DeleteFilterTest : public ::testing::Test { static constexpr std::string_view kDataPath = "data.parquet"; + Result> DeletionVectorFile( + const std::string& path, const std::vector& positions, + const std::string& data_path = std::string(kDataPath)) { + DeletionVectorWriterOptions options{ + .path = path, + .io = file_io_, + }; + ICEBERG_ASSIGN_OR_RAISE(auto writer, DeletionVectorWriter::Make(std::move(options))); + for (int64_t pos : positions) { + ICEBERG_RETURN_UNEXPECTED( + writer->Delete(data_path, pos, partition_spec_, PartitionValues{})); + } + ICEBERG_RETURN_UNEXPECTED(writer->Close()); + ICEBERG_ASSIGN_OR_RAISE(auto metadata, writer->Metadata()); + return metadata.delete_files[0]; + } + std::shared_ptr file_io_; std::shared_ptr table_schema_; std::shared_ptr partition_spec_; @@ -1156,7 +1174,7 @@ TEST_F(DeleteFilterTest, DeletionVectorErrorPropagatesFromCompute) { ICEBERG_UNWRAP_OR_FAIL(auto batch, MakeBatch(*filter.value()->RequiredSchema(), R"([[1, 0]])")); auto alive = filter.value()->ComputeAliveRows(batch.schema, batch.array); - ASSERT_THAT(alive, IsError(ErrorKind::kNotSupported)); + ASSERT_THAT(alive, IsError(ErrorKind::kInvalidArgument)); } TEST_F(DeleteFilterTest, EmptyBatchPropagatesDeleteLoadErrors) { @@ -1175,7 +1193,27 @@ TEST_F(DeleteFilterTest, EmptyBatchPropagatesDeleteLoadErrors) { auto alive = filter.value()->ComputeAliveRows(batch.schema, batch.array); - ASSERT_THAT(alive, IsError(ErrorKind::kNotSupported)); + ASSERT_THAT(alive, IsError(ErrorKind::kInvalidArgument)); +} + +TEST_F(DeleteFilterTest, DeletionVectorComputeAliveRows) { + // Write a real deletion vector with DeletionVectorWriter, then load it through + // DeleteFilter (DeleteLoader::LoadDV) and verify deleted positions are filtered. + ICEBERG_UNWRAP_OR_FAIL(auto dv, DeletionVectorFile("dv-alive.puffin", {1, 3})); + std::vector> delete_files = {dv}; + auto requested_schema = Project({1}); + auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, table_schema_, + requested_schema, file_io_, + /*need_row_pos_col=*/true); + ASSERT_THAT(filter, IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto batch, + MakeBatch(*filter.value()->RequiredSchema(), + R"([[10, 0], [20, 1], [30, 2], [40, 3], [50, 4]])")); + auto alive = filter.value()->ComputeAliveRows(batch.schema, batch.array); + + ASSERT_THAT(alive, IsOk()); + ExpectAliveRows(alive.value(), {0, 2, 4}); } TEST_F(DeleteFilterTest, CounterAccumulatesAcrossBatches) { diff --git a/src/iceberg/test/delete_loader_test.cc b/src/iceberg/test/delete_loader_test.cc index b392065cf..15d5980e7 100644 --- a/src/iceberg/test/delete_loader_test.cc +++ b/src/iceberg/test/delete_loader_test.cc @@ -26,10 +26,12 @@ #include #include "iceberg/arrow/arrow_io_internal.h" +#include "iceberg/data/deletion_vector_writer.h" #include "iceberg/data/equality_delete_writer.h" #include "iceberg/data/position_delete_writer.h" #include "iceberg/deletes/position_delete_index.h" #include "iceberg/file_format.h" +#include "iceberg/file_io.h" #include "iceberg/manifest/manifest_entry.h" #include "iceberg/parquet/parquet_register.h" #include "iceberg/partition_spec.h" @@ -110,6 +112,21 @@ class DeleteLoaderTest : public ::testing::Test { return writer->Metadata().value().data_files[0]; } + /// Write a deletion vector to a Puffin file and return the DataFile metadata. + std::shared_ptr WriteDeletionVector(const std::string& path, + const std::string& referenced_data_file, + const std::vector& positions) { + auto writer = DeletionVectorWriter::Make( + DeletionVectorWriterOptions{.path = path, .io = file_io_}) + .value(); + for (int64_t pos : positions) { + ICEBERG_THROW_NOT_OK( + writer->Delete(referenced_data_file, pos, partition_spec_, PartitionValues{})); + } + ICEBERG_THROW_NOT_OK(writer->Close()); + return writer->Metadata().value().delete_files[0]; + } + std::shared_ptr file_io_; std::shared_ptr schema_; std::shared_ptr partition_spec_; @@ -239,15 +256,73 @@ TEST_F(DeleteLoaderTest, LoadPositionDeletesFastPathHonorsReferencedDataFile) { ASSERT_FALSE(index.IsDeleted(kRowCount)); } -TEST_F(DeleteLoaderTest, LoadPositionDeletesRejectsDV) { +TEST_F(DeleteLoaderTest, LoadDeletionVector) { + auto dv_file = + WriteDeletionVector("dv-a.puffin", "data.parquet", {0, 5, 10, 4'000'000'000LL}); + + std::vector> files = {dv_file}; + auto result = loader_->LoadPositionDeletes(files, "data.parquet"); + ASSERT_THAT(result, IsOk()); + + auto& index = result.value(); + ASSERT_EQ(index.Cardinality(), 4); + ASSERT_TRUE(index.IsDeleted(0)); + ASSERT_TRUE(index.IsDeleted(5)); + ASSERT_TRUE(index.IsDeleted(10)); + ASSERT_TRUE(index.IsDeleted(4'000'000'000LL)); + ASSERT_FALSE(index.IsDeleted(1)); +} + +TEST_F(DeleteLoaderTest, LoadDeletionVectorSkipsMismatchedReferencedDataFile) { + auto dv_file = WriteDeletionVector("dv-b.puffin", "other-data.parquet", {1, 2, 3}); + + std::vector> files = {dv_file}; + auto result = loader_->LoadPositionDeletes(files, "data.parquet"); + ASSERT_THAT(result, IsOk()); + ASSERT_TRUE(result.value().IsEmpty()); +} + +TEST_F(DeleteLoaderTest, LoadDeletionVectorRequiresContentOffsetAndSize) { auto dv_file = std::make_shared(DataFile{ .content = DataFile::Content::kPositionDeletes, .file_path = "dv.puffin", .file_format = FileFormatType::kPuffin, + .referenced_data_file = "data.parquet", }); std::vector> files = {dv_file}; auto result = loader_->LoadPositionDeletes(files, "data.parquet"); - ASSERT_THAT(result, IsError(ErrorKind::kNotSupported)); + ASSERT_THAT(result, IsError(ErrorKind::kInvalidArgument)); +} + +TEST_F(DeleteLoaderTest, LoadDeletionVectorRejectsCardinalityMismatch) { + auto dv_file = WriteDeletionVector("dv-card.puffin", "data.parquet", {0, 1, 2}); + // Corrupt the recorded cardinality so it no longer matches the bitmap. + dv_file->record_count = 99; + + std::vector> files = {dv_file}; + auto result = loader_->LoadPositionDeletes(files, "data.parquet"); + ASSERT_THAT(result, IsError(ErrorKind::kInvalidArgument)); +} + +// Iceberg uses either a deletion vector or position deletes for a data file, not +// both. This exercises the loader's robustness: a mixed list is merged into one +// index, with each source filtered by the target data file path. +TEST_F(DeleteLoaderTest, LoadMixedDeletionVectorAndPositionDeletes) { + auto dv = WriteDeletionVector("dv-mixed.puffin", "data.parquet", {0, 5}); + auto pos = WritePositionDeletes("pos-mixed.parquet", + {{"data.parquet", 10}, {"data.parquet", 20}}); + + std::vector> files = {dv, pos}; + auto result = loader_->LoadPositionDeletes(files, "data.parquet"); + ASSERT_THAT(result, IsOk()); + + auto& index = result.value(); + EXPECT_EQ(index.Cardinality(), 4); + EXPECT_TRUE(index.IsDeleted(0)); + EXPECT_TRUE(index.IsDeleted(5)); + EXPECT_TRUE(index.IsDeleted(10)); + EXPECT_TRUE(index.IsDeleted(20)); + EXPECT_FALSE(index.IsDeleted(1)); } TEST_F(DeleteLoaderTest, LoadPositionDeletesRejectsWrongContent) { diff --git a/src/iceberg/test/deletion_vector_writer_test.cc b/src/iceberg/test/deletion_vector_writer_test.cc new file mode 100644 index 000000000..fdc70b1a2 --- /dev/null +++ b/src/iceberg/test/deletion_vector_writer_test.cc @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/data/deletion_vector_writer.h" + +#include +#include +#include + +#include + +#include "iceberg/data/delete_loader.h" +#include "iceberg/deletes/position_delete_index.h" +#include "iceberg/deletes/roaring_position_bitmap.h" +#include "iceberg/file_format.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/partition_spec.h" +#include "iceberg/row/partition_values.h" +#include "iceberg/test/matchers.h" +#include "iceberg/test/mock_io.h" + +namespace iceberg { + +namespace { + +std::shared_ptr FindByReferencedFile( + const std::vector>& files, const std::string& ref) { + for (const auto& file : files) { + if (file->referenced_data_file == ref) { + return file; + } + } + return nullptr; +} + +std::shared_ptr UnpartitionedSpec() { + return PartitionSpec::Unpartitioned(); +} + +} // namespace + +// Full write -> read round trip: write deletion vectors with the writer, then +// load them back through DeleteLoader using the produced DataFile metadata. +TEST(DeletionVectorWriterTest, WriteThenLoadEndToEnd) { + auto io = std::make_shared(); + auto spec = UnpartitionedSpec(); + + std::vector> delete_files; + { + ICEBERG_UNWRAP_OR_FAIL(auto writer, + DeletionVectorWriter::Make(DeletionVectorWriterOptions{ + .path = "memory://deletes.puffin", + .io = io, + .properties = {{"created-by", "iceberg-cpp-test"}}, + })); + + ASSERT_THAT(writer->Delete("data-a.parquet", 0, spec, PartitionValues{}), IsOk()); + ASSERT_THAT(writer->Delete("data-a.parquet", 5, spec, PartitionValues{}), IsOk()); + ASSERT_THAT(writer->Delete("data-a.parquet", 10, spec, PartitionValues{}), IsOk()); + ASSERT_THAT(writer->Delete("data-b.parquet", 1, spec, PartitionValues{}), IsOk()); + ASSERT_THAT(writer->Delete("data-b.parquet", 2, spec, PartitionValues{}), IsOk()); + ASSERT_THAT(writer->Close(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto result, writer->Metadata()); + delete_files = result.delete_files; + // Each referenced data file is reported once. + EXPECT_EQ(result.referenced_data_files.size(), 2u); + // No previous deletes were loaded, so nothing was rewritten. + EXPECT_TRUE(result.rewritten_delete_files.empty()); + } + + // One DataFile per referenced data file. + ASSERT_EQ(delete_files.size(), 2u); + + auto dv_a = FindByReferencedFile(delete_files, "data-a.parquet"); + auto dv_b = FindByReferencedFile(delete_files, "data-b.parquet"); + ASSERT_NE(dv_a, nullptr); + ASSERT_NE(dv_b, nullptr); + + // Metadata is spec-compliant for a deletion vector. + EXPECT_EQ(dv_a->content, DataFile::Content::kPositionDeletes); + EXPECT_EQ(dv_a->file_format, FileFormatType::kPuffin); + EXPECT_TRUE(dv_a->IsDeletionVector()); + EXPECT_EQ(dv_a->file_path, "memory://deletes.puffin"); + EXPECT_EQ(dv_a->record_count, 3); + EXPECT_TRUE(dv_a->content_offset.has_value()); + EXPECT_TRUE(dv_a->content_size_in_bytes.has_value()); + EXPECT_GT(dv_a->file_size_in_bytes, 0); + EXPECT_EQ(dv_a->partition_spec_id, spec->spec_id()); + EXPECT_EQ(dv_b->record_count, 2); + + // Both blobs live in the same Puffin file but at different offsets. + EXPECT_EQ(dv_a->file_path, dv_b->file_path); + EXPECT_NE(dv_a->content_offset.value(), dv_b->content_offset.value()); + + // Read back through DeleteLoader for data-a.parquet. + DeleteLoader loader(io); + { + auto result = loader.LoadPositionDeletes(delete_files, "data-a.parquet"); + ASSERT_THAT(result, IsOk()); + auto& index = result.value(); + EXPECT_EQ(index.Cardinality(), 3); + EXPECT_TRUE(index.IsDeleted(0)); + EXPECT_TRUE(index.IsDeleted(5)); + EXPECT_TRUE(index.IsDeleted(10)); + EXPECT_FALSE(index.IsDeleted(1)); + } + + // And for data-b.parquet (the loader filters by referenced_data_file). + { + auto result = loader.LoadPositionDeletes(delete_files, "data-b.parquet"); + ASSERT_THAT(result, IsOk()); + auto& index = result.value(); + EXPECT_EQ(index.Cardinality(), 2); + EXPECT_TRUE(index.IsDeleted(1)); + EXPECT_TRUE(index.IsDeleted(2)); + EXPECT_FALSE(index.IsDeleted(0)); + } +} + +// The PositionDeleteIndex overload bulk-adds positions for a data file. +TEST(DeletionVectorWriterTest, DeleteFromIndex) { + auto io = std::make_shared(); + auto spec = UnpartitionedSpec(); + + PositionDeleteIndex positions; + positions.Delete(0); + positions.Delete(3, 6); // [3, 6) -> 3, 4, 5 + + ICEBERG_UNWRAP_OR_FAIL(auto writer, + DeletionVectorWriter::Make(DeletionVectorWriterOptions{ + .path = "memory://from-index.puffin", .io = io})); + ASSERT_THAT(writer->Delete("data.parquet", positions, spec, PartitionValues{}), IsOk()); + ASSERT_THAT(writer->Close(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto result, writer->Metadata()); + ASSERT_EQ(result.delete_files.size(), 1u); + EXPECT_EQ(result.delete_files[0]->record_count, 4); + + DeleteLoader loader(io); + auto loaded = loader.LoadPositionDeletes(result.delete_files, "data.parquet"); + ASSERT_THAT(loaded, IsOk()); + EXPECT_EQ(loaded.value().Cardinality(), 4); + EXPECT_TRUE(loaded.value().IsDeleted(0)); + EXPECT_TRUE(loaded.value().IsDeleted(5)); + EXPECT_FALSE(loaded.value().IsDeleted(6)); +} + +// Previously written deletes are merged into the new vector, and the file-scoped +// delete files they came from are reported as rewritten. +TEST(DeletionVectorWriterTest, LoadPreviousDeletesMergesAndReportsRewritten) { + auto io = std::make_shared(); + auto spec = UnpartitionedSpec(); + + auto previous_dv = std::make_shared(DataFile{ + .content = DataFile::Content::kPositionDeletes, + .file_path = "memory://old.puffin", + .file_format = FileFormatType::kPuffin, + .referenced_data_file = "data.parquet", + }); + + ICEBERG_UNWRAP_OR_FAIL( + auto writer, + DeletionVectorWriter::Make(DeletionVectorWriterOptions{ + .path = "memory://merged.puffin", + .io = io, + .load_previous_deletes = [&](std::string_view path) -> Result { + if (path != "data.parquet") { + return PreviousDeletes{}; + } + auto index = std::make_shared(); + index->Delete(100); + index->Delete(200); + return PreviousDeletes{.index = index, .delete_files = {previous_dv}}; + }, + })); + + ASSERT_THAT(writer->Delete("data.parquet", 0, spec, PartitionValues{}), IsOk()); + ASSERT_THAT(writer->Close(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto result, writer->Metadata()); + ASSERT_EQ(result.delete_files.size(), 1u); + // New position plus the two previous positions. + EXPECT_EQ(result.delete_files[0]->record_count, 3); + // The previous DV is file-scoped, so it is reported for removal. + ASSERT_EQ(result.rewritten_delete_files.size(), 1u); + EXPECT_EQ(result.rewritten_delete_files[0]->file_path, "memory://old.puffin"); + + DeleteLoader loader(io); + auto loaded = loader.LoadPositionDeletes(result.delete_files, "data.parquet"); + ASSERT_THAT(loaded, IsOk()); + EXPECT_EQ(loaded.value().Cardinality(), 3); + EXPECT_TRUE(loaded.value().IsDeleted(0)); + EXPECT_TRUE(loaded.value().IsDeleted(100)); + EXPECT_TRUE(loaded.value().IsDeleted(200)); +} + +// A previous delete that is not file-scoped (e.g. a partition-scoped position +// delete) is merged into the new vector but is NOT reported as rewritten. +TEST(DeletionVectorWriterTest, PartitionScopedPreviousDeleteMergesButNotRewritten) { + auto io = std::make_shared(); + auto spec = UnpartitionedSpec(); + + // No referenced_data_file and no equal file_path bounds -> not file-scoped. + auto previous_position_delete = std::make_shared(DataFile{ + .content = DataFile::Content::kPositionDeletes, + .file_path = "memory://partition-deletes.parquet", + .file_format = FileFormatType::kParquet, + }); + + ICEBERG_UNWRAP_OR_FAIL( + auto writer, + DeletionVectorWriter::Make(DeletionVectorWriterOptions{ + .path = "memory://merged-partition.puffin", + .io = io, + .load_previous_deletes = [&](std::string_view path) -> Result { + auto index = std::make_shared(); + index->Delete(50); + return PreviousDeletes{.index = index, + .delete_files = {previous_position_delete}}; + }, + })); + + ASSERT_THAT(writer->Delete("data.parquet", 0, spec, PartitionValues{}), IsOk()); + ASSERT_THAT(writer->Close(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto result, writer->Metadata()); + ASSERT_EQ(result.delete_files.size(), 1u); + // The previous position was merged in. + EXPECT_EQ(result.delete_files[0]->record_count, 2); + // The previous delete is partition-scoped, so it is not rewritten. + EXPECT_TRUE(result.rewritten_delete_files.empty()); + + DeleteLoader loader(io); + auto loaded = loader.LoadPositionDeletes(result.delete_files, "data.parquet"); + ASSERT_THAT(loaded, IsOk()); + EXPECT_EQ(loaded.value().Cardinality(), 2); + EXPECT_TRUE(loaded.value().IsDeleted(0)); + EXPECT_TRUE(loaded.value().IsDeleted(50)); +} + +TEST(DeletionVectorWriterTest, EmptyWriterProducesNoDataFiles) { + auto io = std::make_shared(); + ICEBERG_UNWRAP_OR_FAIL(auto writer, + DeletionVectorWriter::Make(DeletionVectorWriterOptions{ + .path = "memory://empty.puffin", .io = io})); + ASSERT_THAT(writer->Close(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto result, writer->Metadata()); + EXPECT_TRUE(result.delete_files.empty()); + + // No blobs were written, so no (orphan) Puffin file should have been created. + EXPECT_THAT(io->NewInputFile("memory://empty.puffin"), IsError(ErrorKind::kNotFound)); +} + +TEST(DeletionVectorWriterTest, DeleteRejectsInvalidPosition) { + auto io = std::make_shared(); + auto spec = UnpartitionedSpec(); + ICEBERG_UNWRAP_OR_FAIL(auto writer, + DeletionVectorWriter::Make(DeletionVectorWriterOptions{ + .path = "memory://invalid.puffin", .io = io})); + EXPECT_THAT(writer->Delete("", 0, spec, PartitionValues{}), + IsError(ErrorKind::kInvalidArgument)); +} + +TEST(DeletionVectorWriterTest, MakeRejectsNullIo) { + EXPECT_THAT(DeletionVectorWriter::Make(DeletionVectorWriterOptions{.path = "x.puffin"}), + IsError(ErrorKind::kInvalidArgument)); +} + +TEST(DeletionVectorWriterTest, MakeRejectsEmptyPath) { + auto io = std::make_shared(); + EXPECT_THAT(DeletionVectorWriter::Make(DeletionVectorWriterOptions{.io = io}), + IsError(ErrorKind::kInvalidArgument)); +} + +TEST(DeletionVectorWriterTest, DeleteAfterCloseFails) { + auto io = std::make_shared(); + auto spec = UnpartitionedSpec(); + ICEBERG_UNWRAP_OR_FAIL(auto writer, + DeletionVectorWriter::Make(DeletionVectorWriterOptions{ + .path = "memory://closed.puffin", .io = io})); + ASSERT_THAT(writer->Close(), IsOk()); + EXPECT_THAT(writer->Delete("data-a.parquet", 0, spec, PartitionValues{}), + IsError(ErrorKind::kValidationFailed)); +} + +} // namespace iceberg diff --git a/src/iceberg/test/file_scan_task_reader_test.cc b/src/iceberg/test/file_scan_task_reader_test.cc index 1630a108a..d09ff80f4 100644 --- a/src/iceberg/test/file_scan_task_reader_test.cc +++ b/src/iceberg/test/file_scan_task_reader_test.cc @@ -38,6 +38,7 @@ #include "iceberg/arrow/arrow_register.h" #include "iceberg/arrow_c_data_guard_internal.h" #include "iceberg/arrow_c_data_util_internal.h" +#include "iceberg/data/deletion_vector_writer.h" #include "iceberg/data/equality_delete_writer.h" #include "iceberg/data/position_delete_writer.h" #include "iceberg/file_format.h" @@ -298,6 +299,23 @@ class FileScanTaskReaderTest : public TempFileTestBase { return metadata.data_files[0]; } + Result> MakeDeletionVectorFile( + const std::string& path, const std::vector& positions, + const std::string& data_path) { + DeletionVectorWriterOptions options{ + .path = path, + .io = file_io_, + }; + ICEBERG_ASSIGN_OR_RAISE(auto writer, DeletionVectorWriter::Make(std::move(options))); + for (int64_t pos : positions) { + ICEBERG_RETURN_UNEXPECTED( + writer->Delete(data_path, pos, partition_spec_, PartitionValues{})); + } + ICEBERG_RETURN_UNEXPECTED(writer->Close()); + ICEBERG_ASSIGN_OR_RAISE(auto metadata, writer->Metadata()); + return metadata.delete_files[0]; + } + void VerifyStream(struct ArrowArrayStream* stream, std::string_view expected_json) { auto record_batch_reader = ::arrow::ImportRecordBatchReader(stream).ValueOrDie(); @@ -401,6 +419,30 @@ TEST_F(FileScanTaskReaderTest, OpenWithPositionDeletesFiltersRowsAndPrunesPos) { ASSERT_NO_FATAL_FAILURE(VerifyStream(&stream, R"([[1, "Foo"], [3, "Baz"]])")); } +TEST_F(FileScanTaskReaderTest, OpenWithDeletionVectorFiltersRows) { + ICEBERG_UNWRAP_OR_FAIL( + auto data_file, + MakeDataFile(table_schema_, + R"([[1, "Foo", "blue"], [2, "Bar", "red"], [3, "Baz", "green"]])")); + ICEBERG_UNWRAP_OR_FAIL( + auto dv, MakeDeletionVectorFile(CreateNewTempFilePathWithSuffix(".puffin"), {1}, + data_file->file_path)); + FileScanTask task(data_file, {dv}); + + FileScanTaskReader::Options options{ + .io = file_io_, + .table_schema = table_schema_, + .schemas = {table_schema_}, + .projected_schema = projected_schema_, + }; + ICEBERG_UNWRAP_OR_FAIL(auto reader, FileScanTaskReader::Make(std::move(options))); + auto stream_result = reader->Open(task); + ASSERT_THAT(stream_result, IsOk()); + auto stream = std::move(stream_result.value()); + + ASSERT_NO_FATAL_FAILURE(VerifyStream(&stream, R"([[1, "Foo"], [3, "Baz"]])")); +} + TEST_F(FileScanTaskReaderTest, OpenWithEqualityDeletesAddsAndPrunesDeleteOnlyColumns) { ICEBERG_UNWRAP_OR_FAIL( auto data_file, diff --git a/src/iceberg/test/meson.build b/src/iceberg/test/meson.build index b01a61904..d173ed17f 100644 --- a/src/iceberg/test/meson.build +++ b/src/iceberg/test/meson.build @@ -123,6 +123,10 @@ iceberg_tests = { ), 'use_data': true, }, + 'deletion_vector_writer_test': { + 'sources': files('deletion_vector_writer_test.cc'), + 'use_data': true, + }, } if get_option('rest').enabled() diff --git a/src/iceberg/test/position_delete_index_test.cc b/src/iceberg/test/position_delete_index_test.cc index 0c24a587b..33ba09a9d 100644 --- a/src/iceberg/test/position_delete_index_test.cc +++ b/src/iceberg/test/position_delete_index_test.cc @@ -19,8 +19,13 @@ #include "iceberg/deletes/position_delete_index.h" +#include +#include + #include +#include "iceberg/test/matchers.h" + namespace iceberg { TEST(PositionDeleteIndexTest, TestEmptyIndex) { @@ -200,4 +205,89 @@ TEST(PositionDeleteIndexTest, TestMergeIdempotence) { ASSERT_TRUE(index1.IsDeleted(20)); } +// ==================== deletion-vector-v1 serialization ==================== + +TEST(PositionDeleteIndexTest, SerializeRoundTrip) { + PositionDeleteIndex index; + for (int64_t pos : + {int64_t{0}, int64_t{1}, int64_t{5}, int64_t{100}, int64_t{4'000'000'000}}) { + index.Delete(pos); + } + + ICEBERG_UNWRAP_OR_FAIL(auto blob, index.Serialize()); + ICEBERG_UNWRAP_OR_FAIL(auto restored, PositionDeleteIndex::Deserialize(blob)); + + EXPECT_EQ(restored.Cardinality(), 5); + for (int64_t pos : + {int64_t{0}, int64_t{1}, int64_t{5}, int64_t{100}, int64_t{4'000'000'000}}) { + EXPECT_TRUE(restored.IsDeleted(pos)); + } +} + +TEST(PositionDeleteIndexTest, SerializeEmptyRoundTrip) { + PositionDeleteIndex index; + ICEBERG_UNWRAP_OR_FAIL(auto blob, index.Serialize()); + ICEBERG_UNWRAP_OR_FAIL(auto restored, PositionDeleteIndex::Deserialize(blob)); + EXPECT_TRUE(restored.IsEmpty()); +} + +// Spans two high-32-bit keys and exercises all Roaring container types +// (sparse "array", dense "bitset", and run containers after optimization). +TEST(PositionDeleteIndexTest, SerializeAllContainerTypesAcrossKeys) { + constexpr int64_t kKeyStride = 0x100000000LL; // 2^32: high-32-bit key + constexpr int64_t kContainerStride = 1 << 16; // 2^16: Roaring container + auto pos = [](int64_t key, int64_t container, int64_t value) { + return key * kKeyStride + container * kContainerStride + value; + }; + + PositionDeleteIndex index; + int64_t expected = 0; + for (int64_t key : {int64_t{0}, int64_t{1}}) { + index.Delete(pos(key, 0, 5)); + index.Delete(pos(key, 0, 7)); + expected += 2; + index.Delete(pos(key, 1, 1), pos(key, 1, 1000)); + expected += 999; + index.Delete(pos(key, 2, 1), pos(key, 2, kContainerStride)); + expected += kContainerStride - 1; + } + + ICEBERG_UNWRAP_OR_FAIL(auto blob, index.Serialize()); + ICEBERG_UNWRAP_OR_FAIL(auto restored, PositionDeleteIndex::Deserialize(blob)); + + EXPECT_EQ(restored.Cardinality(), expected); + EXPECT_TRUE(restored.IsDeleted(pos(0, 0, 5))); + EXPECT_TRUE(restored.IsDeleted(pos(1, 2, kContainerStride - 1))); + EXPECT_TRUE(restored.IsDeleted(pos(0, 1, 999))); + EXPECT_FALSE(restored.IsDeleted(pos(0, 0, 6))); + EXPECT_FALSE(restored.IsDeleted(pos(1, 1, 1000))); // range end is exclusive +} + +TEST(PositionDeleteIndexTest, DeserializeRejectsCorruptedCrc) { + PositionDeleteIndex index; + index.Delete(1); + index.Delete(2); + ICEBERG_UNWRAP_OR_FAIL(auto blob, index.Serialize()); + + blob.back() ^= 0xFF; + EXPECT_THAT(PositionDeleteIndex::Deserialize(blob), + IsError(ErrorKind::kInvalidArgument)); +} + +TEST(PositionDeleteIndexTest, DeserializeRejectsBadMagic) { + PositionDeleteIndex index; + index.Delete(1); + ICEBERG_UNWRAP_OR_FAIL(auto blob, index.Serialize()); + + blob[4] = 0x00; + EXPECT_THAT(PositionDeleteIndex::Deserialize(blob), + IsError(ErrorKind::kInvalidArgument)); +} + +TEST(PositionDeleteIndexTest, DeserializeRejectsTruncatedBlob) { + std::vector blob = {0x00, 0x00}; + EXPECT_THAT(PositionDeleteIndex::Deserialize(blob), + IsError(ErrorKind::kInvalidArgument)); +} + } // namespace iceberg