diff --git a/.github/workflows/build-macos.yaml b/.github/workflows/build-macos.yaml index d53872d79..d0beb544b 100644 --- a/.github/workflows/build-macos.yaml +++ b/.github/workflows/build-macos.yaml @@ -36,10 +36,10 @@ jobs: strategy: matrix: build_type: [RelWithDebugInfo] - cxx: [clang++-15] + cxx: [clang++-19] include: - - cxx: clang++-15 - package: llvm@15 + - cxx: clang++-19 + package: llvm@19 cc_name: clang cxx_name: clang++ needs_prefix: true diff --git a/bindings/python/include/svs/python/manager.h b/bindings/python/include/svs/python/manager.h index 3583c6836..68188c243 100644 --- a/bindings/python/include/svs/python/manager.h +++ b/bindings/python/include/svs/python/manager.h @@ -44,7 +44,10 @@ pybind11::tuple py_search( matrix_view(result_idx), matrix_view(result_dists) ); - svs::index::search_batch_into(self, q_result, query_data.cview()); + { + pybind11::gil_scoped_release release; + svs::index::search_batch_into(self, q_result, query_data.cview()); + } return pybind11::make_tuple(result_idx, result_dists); } @@ -86,7 +89,7 @@ void add_threading_interface(pybind11::class_& manager) { "num_threads", &Manager::get_num_threads, [](Manager& self, int num_threads) { - self.set_threadpool(svs::threads::DefaultThreadPool(num_threads)); + self.set_threadpool(svs::threads::SwitchNativeThreadPool(num_threads)); }, "Read/Write (int): Get and set the number of threads used to process queries." ); diff --git a/bindings/python/src/dynamic_vamana.cpp b/bindings/python/src/dynamic_vamana.cpp index 1b24eca1b..9361ea611 100644 --- a/bindings/python/src/dynamic_vamana.cpp +++ b/bindings/python/src/dynamic_vamana.cpp @@ -174,7 +174,12 @@ void add_points( "Expected IDs to be the same length as the number of rows in points!" ); } - index.add_points(data_view(py_data), std::span(ids.data(), ids.size()), reuse_empty); + auto data = data_view(py_data); + auto id_span = std::span(ids.data(), ids.size()); + { + py::gil_scoped_release release; + index.add_points(data, id_span, reuse_empty); + } } const char* ADD_POINTS_DOCSTRING = R"( @@ -381,8 +386,18 @@ void wrap(py::module& m) { add_dynamic_vamana_properties(vamana); - vamana.def("consolidate", &svs::DynamicVamana::consolidate, CONSOLIDATE_DOCSTRING); - vamana.def("compact", &svs::DynamicVamana::compact, COMPACT_DOCSTRING); + vamana.def( + "consolidate", + &svs::DynamicVamana::consolidate, + py::call_guard(), + CONSOLIDATE_DOCSTRING + ); + vamana.def( + "compact", + &svs::DynamicVamana::compact, + py::call_guard(), + COMPACT_DOCSTRING + ); // Reloading vamana.def( @@ -435,7 +450,11 @@ void wrap(py::module& m) { vamana.def( "delete", [](svs::DynamicVamana& index, const py_contiguous_array_t& ids) { - index.delete_points(as_span(ids)); + auto id_span = as_span(ids); + { + py::gil_scoped_release release; + index.delete_points(id_span); + } }, py::arg("ids"), DELETE_DOCSTRING diff --git a/bindings/python/src/python_bindings.cpp b/bindings/python/src/python_bindings.cpp index e7c14bf6f..f83057fc1 100644 --- a/bindings/python/src/python_bindings.cpp +++ b/bindings/python/src/python_bindings.cpp @@ -156,7 +156,7 @@ class ScopedModuleNameOverride { } // namespace -PYBIND11_MODULE(_svs, m) { +PYBIND11_MODULE(_svs, m, py::mod_gil_not_used()) { // Internally, the top level `__init__.py` imports everything from the C++ module named // `_svs`. // diff --git a/include/svs/concepts/graph.h b/include/svs/concepts/graph.h index d3e139eca..07fbc6f86 100644 --- a/include/svs/concepts/graph.h +++ b/include/svs/concepts/graph.h @@ -50,6 +50,15 @@ namespace svs::graphs { +/// Outcome of `MemoryGraph::add_edge(src, dst)`. Distinguishes three cases so callers +/// can route dropped edges (e.g. to a backedge buffer) without a TOCTOU race between a +/// pre-check and the insert. +enum class AddEdgeResult : uint8_t { + Added, // Edge was inserted. + AlreadyExists, // Edge was already present (or self-loop). Not inserted. + Full, // Node's adjacency list is at max_degree. Edge NOT inserted. +}; + // clang-format off /// @@ -135,12 +144,13 @@ template using index_type_t = typename G::index_type; /// @code{.cpp} /// template /// concept MemoryGraph = requires(T& g, const T& const_g) { -/// // Add an edge to the graph. -/// // Must return the out degree of `src` after adding the edge `src -> dst`. -/// // If adding the edge would result in the graph exceeding its maximum degree, -/// // implementations are free to not add this edge. +/// // Add an edge to the graph atomically. Returns an AddEdgeResult indicating: +/// // Added - edge was inserted +/// // AlreadyExists - edge was already present (or self-loop); no insert +/// // Full - node is at max_degree; edge NOT inserted (caller should +/// // route to an overflow buffer if needed) /// requires requires(index_type_t src, index_type_t dst) { -/// { g.add_edge(src, dst) } -> std::convertible_to; +/// { g.add_edge(src, dst) } -> std::convertible_to; /// }; /// /// // Completely clear the adjacency list for vertex ``i``. @@ -164,7 +174,7 @@ template concept MemoryGraph = requires(T& g, const T& const_g) { // Adding an edge. requires requires(index_type_t src, index_type_t dst) { - { g.add_edge(src, dst) } -> std::convertible_to; + { g.add_edge(src, dst) } -> std::convertible_to; }; // Clear adjacency list. diff --git a/include/svs/core/data/simple.h b/include/svs/core/data/simple.h index 251f30559..a41cf42e3 100644 --- a/include/svs/core/data/simple.h +++ b/include/svs/core/data/simple.h @@ -30,6 +30,7 @@ #include "svs/lib/misc.h" #include "svs/lib/prefetch.h" #include "svs/lib/saveload.h" +#include "svs/lib/segmented_vector.h" #include "svs/lib/threads.h" #include "svs/lib/uuid.h" @@ -728,7 +729,6 @@ class SimpleData> { , allocator_{alloc} { size_t elements_per_block = blocksize_.value(); size_t num_blocks = lib::div_round_up(n_elements, elements_per_block); - blocks_.reserve(num_blocks); for (size_t i = 0; i < num_blocks; ++i) { add_block(); } @@ -781,10 +781,10 @@ class SimpleData> { /// Add a new data block to the end of the current collection of blocks. /// void add_block() { - blocks_.emplace_back( + blocks_.push_back(array_type( make_dims(blocksize().value(), lib::forward_extent(dimensions())), allocator_.get_allocator() - ); + )); } /// @@ -952,7 +952,12 @@ class SimpleData> { private: // The blocksize in terms of number of vectors. lib::PowerOfTwo blocksize_; - std::vector blocks_; + // Grow-stable directory of fixed-size blocks: appending a block never relocates the + // existing block wrappers (or the heap buffers they point to), so a concurrent + // lock-free reader subscripting blocks_[block_id] is safe against a writer growing the + // dataset. Element addressing (block_id, data_id) is unchanged; only the outer block + // directory is grow-stable (2-level lock-free array). See svs/lib/segmented_vector.h. + lib::SegmentedVector blocks_; size_t dimensions_; size_t size_; Blocked allocator_; diff --git a/include/svs/core/graph/graph.h b/include/svs/core/graph/graph.h index 05db19b7c..64431484b 100644 --- a/include/svs/core/graph/graph.h +++ b/include/svs/core/graph/graph.h @@ -20,8 +20,14 @@ #include "svs/core/data/simple.h" #include "svs/lib/algorithms.h" #include "svs/lib/boundscheck.h" +#include "svs/lib/concurrency/atomic_span.h" +#include "svs/lib/concurrency/seqlock.h" #include "svs/lib/saveload.h" +#include "svs/lib/segmented_vector.h" +#include "svs/lib/spinlock.h" +#include +#include #include #include #include @@ -57,12 +63,12 @@ template class SimpleGrap /// The integer representation used to represent vertices in this graph. using index_type = Idx; using value_type = std::span; - using const_value_type = std::span; + using const_value_type = AtomicSpan; /// Type used to represent mutable adjacency lists externally. using reference = std::span; /// Type used to represent constant adjacency lists externally. - using const_reference = std::span; + using const_reference = AtomicSpan; /// /// @brief Construct an empty graph of the desired size. @@ -75,7 +81,9 @@ template class SimpleGrap /// explicit SimpleGraphBase(size_t num_nodes, size_t max_degree) : data_{num_nodes, max_degree + 1} - , max_degree_{lib::narrow(max_degree)} { + , max_degree_{lib::narrow(max_degree)} + , seq_counters_(num_nodes) + , node_locks_(num_nodes) { reset(); } @@ -85,15 +93,19 @@ template class SimpleGrap size_t num_nodes, size_t max_degree, const Allocator& allocator ) : data_{num_nodes, max_degree + 1, allocator} - , max_degree_{lib::narrow(max_degree)} { + , max_degree_{lib::narrow(max_degree)} + , seq_counters_(num_nodes) + , node_locks_(num_nodes) { reset(); } explicit SimpleGraphBase(data_type data) : data_{std::move(data)} - , max_degree_{lib::narrow(data_.dimensions() - 1)} {} + , max_degree_{lib::narrow(data_.dimensions() - 1)} + , seq_counters_(data_.size()) + , node_locks_(data_.size()) {} - const_reference raw_row(Idx i) const { return data_.get_datum(i); } + std::span raw_row(Idx i) const { return data_.get_datum(i); } /// /// @brief Return the outward adjacency list for vertex ``i``. @@ -103,14 +115,17 @@ template class SimpleGrap const_reference get_node(Idx i) const { // Get the raw data. std::span raw_data = data_.get_datum(i); - auto num_neighbors = raw_data.front(); + Idx num_neighbors = std::atomic_ref(const_cast(raw_data.front())) + .load(std::memory_order_relaxed); + // Clamp to max_degree to safely handle torn reads of the length field. + num_neighbors = std::min(num_neighbors, max_degree_); - // Maybe prefetch the rest of the adjacncy list. + // Maybe prefetch the rest of the adjacency list. size_t bytes = (1 + num_neighbors) * sizeof(Idx); if (bytes > lib::CACHELINE_BYTES) { lib::prefetch(std::as_bytes(raw_data).subspan(lib::CACHELINE_BYTES)); } - return raw_data.subspan(1, num_neighbors); + return AtomicSpan(raw_data.data() + 1, num_neighbors); } /// @@ -119,16 +134,28 @@ template class SimpleGrap /// Complexity: Linear in the maximum degree. /// bool has_edge(Idx src, Idx dst) const { - const auto& list = get_node(src); - auto begin = list.begin(); - auto end = list.end(); - return (std::find(begin, end, dst) != end); + for (;;) { + auto maybe_seq = seq_counters_[src].read_begin(); + if (!maybe_seq) { + detail::pause(); + continue; + } + const auto& list = get_node(src); + bool found = (std::find(list.begin(), list.end(), dst) != list.end()); + if (seq_counters_[src].read_validate(*maybe_seq)) { + return found; + } + detail::pause(); + } } /// /// @brief Return the current out degree of vertex ``i``. /// - size_t get_node_degree(Idx i) const { return data_.get_datum(i).front(); } + size_t get_node_degree(Idx i) const { + return std::atomic_ref(const_cast(data_.get_datum(i).front())) + .load(std::memory_order_relaxed); + } /// /// @brief Prefetch the adjacency list for node ``i`` into the L1 cache. @@ -144,8 +171,11 @@ template class SimpleGrap /// The complexity of this operation is `O(1)`. /// void clear_node(Idx i) { - Idx& num_neighbors = data_.get_datum(i).front(); - num_neighbors = 0; + std::lock_guard lock{node_locks_[i]}; + auto seq = seq_counters_[i].begin_write(); + std::atomic_ref(data_.get_datum(i).front()) + .store(0, std::memory_order_relaxed); + seq_counters_[i].end_write(seq); } /// @@ -177,6 +207,7 @@ template class SimpleGrap /// @copydoc replace_node(Idx,const std::vector&) void replace_node(Idx i, std::span new_neighbors) { + std::lock_guard lock{node_locks_[i]}; std::span raw_data = data_.get_datum(i); // Clamp the number of elements to copy to the maximum out degree to correctly @@ -186,13 +217,31 @@ template class SimpleGrap Idx elements_to_copy = std::min(max_degree_, lib::narrow_cast(new_neighbors.size())); - std::span adjusted_neighbors = new_neighbors.first(elements_to_copy); - value_type adjacency_list = raw_data.subspan(1, elements_to_copy); + auto seq = seq_counters_[i].begin_write(); + for (Idx j = 0; j < elements_to_copy; ++j) { + std::atomic_ref(raw_data[1 + j]) + .store(new_neighbors[j], std::memory_order_relaxed); + } + std::atomic_ref(raw_data[0]) + .store(elements_to_copy, std::memory_order_relaxed); + seq_counters_[i].end_write(seq); + } - std::copy( - adjusted_neighbors.begin(), adjusted_neighbors.end(), adjacency_list.begin() - ); - raw_data.front() = elements_to_copy; + /// @copydoc replace_node(Idx,const std::vector&) + void replace_node(Idx i, AtomicSpan new_neighbors) { + std::lock_guard lock{node_locks_[i]}; + std::span raw_data = data_.get_datum(i); + Idx elements_to_copy = + std::min(max_degree_, lib::narrow_cast(new_neighbors.size())); + + auto seq = seq_counters_[i].begin_write(); + for (Idx j = 0; j < elements_to_copy; ++j) { + std::atomic_ref(raw_data[1 + j]) + .store(new_neighbors[j], std::memory_order_relaxed); + } + std::atomic_ref(raw_data[0]) + .store(elements_to_copy, std::memory_order_relaxed); + seq_counters_[i].end_write(seq); } /// @@ -208,10 +257,10 @@ template class SimpleGrap /// * ``get_node_degree(src) == max_degree()`` (adjacency list is already full) /// * ``dst`` is already an out-neighbor of ``src``. /// - size_t add_edge(Idx src, Idx dst) { + AddEdgeResult add_edge(Idx src, Idx dst) { // Don't assign a node as its own neighbor. if (src == dst) { - return get_node_degree(src); + return AddEdgeResult::AlreadyExists; } if constexpr (checkbounds_v) { @@ -225,11 +274,15 @@ template class SimpleGrap } } + // Acquire lock — all reads and writes under the lock to prevent + // concurrent writers from seeing stale state. + std::lock_guard lock{node_locks_[src]}; + // Check if there's room for the new node. std::span raw_data = data_.get_datum(src); Idx current_size = raw_data.front(); if (current_size == max_degree_) { - return current_size; + return AddEdgeResult::Full; } // At this point, we know there is room. @@ -248,17 +301,22 @@ template class SimpleGrap auto it = std::find(begin, end - 1, dst); // auto it = std::lower_bound(begin, end - 1, dst); if (it != end - 1 && (*it == dst)) { - return current_size; + return AddEdgeResult::AlreadyExists; + } + + auto seq = seq_counters_[src].begin_write(); + + // Insert at the new location using atomic stores. + for (auto dst_it = end - 1, src_it = end - 2; dst_it != it; --dst_it, --src_it) { + std::atomic_ref(*dst_it).store(*src_it, std::memory_order_relaxed); } + std::atomic_ref(*it).store(dst, std::memory_order_relaxed); - // Insert at the new location. - std::copy_backward(it, end - 1, end); - (*it) = dst; + // Update the number of neighbors. + std::atomic_ref(raw_data.front()).store(new_size, std::memory_order_relaxed); - // // Assign the new edge and update the number of neighbors. - // adjacency_list.back() = dst; - raw_data.front() = new_size; - return new_size; + seq_counters_[src].end_write(seq); + return AddEdgeResult::Added; } /// Return the maximum out-degree this graph is capable of containing. @@ -266,13 +324,27 @@ template class SimpleGrap /// Return the number of vertices currently in the graph. size_t n_nodes() const { return data_.size(); } + /// Return the maximum number of vertices this graph can hold without + /// reallocating any of its underlying storage. + size_t capacity() const { + return std::min({data_.capacity(), seq_counters_.capacity(), node_locks_.capacity()} + ); + } + const data_type& get_data() const { return data_; } data_type& get_data() { return data_; } // Resizeable API - void unsafe_resize(size_t new_size) { data_.resize(new_size); } + void unsafe_resize(size_t new_size) { + data_.resize(new_size); + seq_counters_.resize(new_size); + node_locks_.resize(new_size); + } void add_node() { unsafe_resize(n_nodes() + 1); } + /// @brief Access the per-node sequence lock counters for concurrent read validation. + const SeqLockArray& seq_counters() const { return seq_counters_; } + ///// Saving static constexpr lib::Version save_version = lib::Version(0, 0, 0); static constexpr std::string_view serialization_schema = "default_graph"; @@ -370,6 +442,11 @@ template class SimpleGrap protected: data_type data_; Idx max_degree_; + SeqLockArray seq_counters_; + // Grow-stable: a concurrent add_points Phase 3 backprop locks node_locks_[other] + // lock-free while another add grows the array; segmented storage keeps existing + // locks at stable addresses. See svs/lib/segmented_vector.h. + lib::SegmentedVector node_locks_; }; ///// diff --git a/include/svs/core/translation.h b/include/svs/core/translation.h index 4b91a031f..debc0a8d1 100644 --- a/include/svs/core/translation.h +++ b/include/svs/core/translation.h @@ -144,6 +144,59 @@ class IDTranslator { internal_to_external_[internal_id] = lib::narrow(external_id); } + /// + /// @brief Insert mappings, replacing any existing mapping whose current internal + /// ID is considered stale by the caller (e.g. the associated slot has been + /// marked for deletion but the translator entry was not yet cleaned up). + /// + /// Throws if an external ID already maps to a non-stale internal ID (i.e. the + /// user is trying to re-insert a live mapping). In this case, the translator is + /// left partially modified — prior successful replacements are not rolled back. + /// + /// @param external Container of external IDs to insert. + /// @param internal Container of internal IDs to insert (same length as external). + /// @param is_stale Callable ``bool(internal_id_type)``. Returns true if the given + /// internal ID is stale and its existing mapping may be overwritten. + /// + template + void replace_stale_and_insert( + const External& external, const Internal& internal, IsStale&& is_stale + ) { + auto ext_begin = external.begin(); + auto ext_end = external.end(); + auto int_begin = internal.begin(); + auto int_end = internal.end(); + + if (std::distance(ext_begin, ext_end) != std::distance(int_begin, int_end)) { + throw ANNEXCEPTION( + "Length of external IDs is {} while the length of internal IDs is {}!", + std::distance(ext_begin, ext_end), + std::distance(int_begin, int_end) + ); + } + if (!lib::all_unique(ext_begin, ext_end)) { + throw ANNEXCEPTION("External IDs contain repeat elements!"); + } + if (!lib::all_unique(int_begin, int_end)) { + throw ANNEXCEPTION("Internal IDs contain repeat elements!"); + } + + auto e = ext_begin; + auto i = int_begin; + for (; e != ext_end; ++e, ++i) { + auto found = external_to_internal_.find(*e); + if (found != external_to_internal_.end()) { + auto old_internal = found->second; + if (!is_stale(old_internal)) { + throw ANNEXCEPTION("Index already contains external ID {}!", *e); + } + // Stale — erase reverse mapping; forward will be overwritten below. + internal_to_external_.erase(old_internal); + } + insert_translation(*e, *i); + } + } + /// /// @brief Return whether the external ID exists. /// @@ -180,6 +233,26 @@ class IDTranslator { return internal_to_external_.at(i); } + /// @brief Return the external ID, or a default if not found. + external_id_type + get_external_or(internal_id_type i, external_id_type default_val) const { + auto it = internal_to_external_.find(i); + if (it == internal_to_external_.end()) { + return default_val; + } + return it->second; + } + + /// @brief Return the internal ID, or a default if not found. + internal_id_type + get_internal_or(external_id_type e, internal_id_type default_val) const { + auto it = external_to_internal_.find(e); + if (it == external_to_internal_.end()) { + return default_val; + } + return it->second; + } + /// /// @brief Return a start forward iterator over the external->internal IDs. /// diff --git a/include/svs/index/vamana/consolidate.h b/include/svs/index/vamana/consolidate.h index 3a16410f9..e507d9197 100644 --- a/include/svs/index/vamana/consolidate.h +++ b/include/svs/index/vamana/consolidate.h @@ -196,8 +196,21 @@ class GraphConsolidator { all_candidates.clear(); for (auto dst : neighbors) { if (is_deleted(dst)) { - const auto& others = graph_.get_node(dst); - all_candidates.insert(others.begin(), others.end()); + // SeqLock retry: a concurrent consolidate may be writing dst's + // neighbors if dst is not deleted in the other consolidate's view. + for (;;) { + auto maybe_seq = graph_.seq_counters()[dst].read_begin(); + if (!maybe_seq) { + svs::detail::pause(); + continue; + } + const auto& others = graph_.get_node(dst); + all_candidates.insert(others.begin(), others.end()); + if (graph_.seq_counters()[dst].read_validate(*maybe_seq)) { + break; + } + svs::detail::pause(); + } } else { all_candidates.insert(dst); } @@ -250,40 +263,62 @@ class GraphConsolidator { continue; } - // Determine if any of the neighbors of this node are deleted. - const auto& neighbors = graph_.get_node(src); - if (std::none_of(neighbors.begin(), neighbors.end(), is_deleted)) { - continue; - } + // SeqLock retry: a concurrent consolidate's apply_updates may be + // writing src's neighbors while we read them. + for (;;) { + auto maybe_seq = graph_.seq_counters()[src].read_begin(); + if (!maybe_seq) { + svs::detail::pause(); + continue; + } - // Add all neighbors and neighbors-of-deleted-neighbors. - populate_candidates(all_candidates, neighbors, is_deleted); - - // Insert non-deleted candidates into the vector to prepare for pruning. - filter_candidates( - valid_candidates, - all_candidates, - accessor(data_, src), - accessor, - general_distance, - is_deleted - ); + // Determine if any of the neighbors of this node are deleted. + const auto& neighbors = graph_.get_node(src); + if (std::none_of(neighbors.begin(), neighbors.end(), is_deleted)) { + if (graph_.seq_counters()[src].read_validate(*maybe_seq)) { + break; + } + svs::detail::pause(); + continue; + } - size_t new_candidate_size = - std::min(valid_candidates.size(), params_.max_candidate_pool_size); - valid_candidates.resize(new_candidate_size); - heuristic_prune_neighbors( - prune_strategy(distance_), - params_.prune_to, - params_.alpha, - data_, - accessor, - general_distance, - src, - lib::as_const_span(valid_candidates), - final_candidates - ); - update_buffer.insert(i, final_candidates); + // Add all neighbors and neighbors-of-deleted-neighbors. + populate_candidates(all_candidates, neighbors, is_deleted); + + // Insert non-deleted candidates into the vector to prepare for + // pruning. + filter_candidates( + valid_candidates, + all_candidates, + accessor(data_, src), + accessor, + general_distance, + is_deleted + ); + + size_t new_candidate_size = + std::min(valid_candidates.size(), params_.max_candidate_pool_size); + valid_candidates.resize(new_candidate_size); + heuristic_prune_neighbors( + prune_strategy(distance_), + params_.prune_to, + params_.alpha, + data_, + accessor, + general_distance, + src, + lib::as_const_span(valid_candidates), + final_candidates + ); + + if (graph_.seq_counters()[src].read_validate(*maybe_seq)) { + // Consistent read — commit the results. + update_buffer.insert(i, final_candidates); + break; + } + svs::detail::pause(); + // Retry: discard stale candidates, recompute on next iteration. + } } } diff --git a/include/svs/index/vamana/dynamic_index.h b/include/svs/index/vamana/dynamic_index.h index 5f0ce7c16..e0a640fce 100644 --- a/include/svs/index/vamana/dynamic_index.h +++ b/include/svs/index/vamana/dynamic_index.h @@ -17,7 +17,10 @@ #pragma once // stdlib +#include #include +#include +#include // Include the flat index to spin-up exhaustive searches on demand. #include "svs/index/flat/flat.h" @@ -40,6 +43,7 @@ #include "svs/index/vamana/vamana_build.h" #include "svs/lib/boundscheck.h" #include "svs/lib/preprocessor.h" +#include "svs/lib/segmented_vector.h" #include "svs/lib/threads.h" namespace svs::index::vamana { @@ -64,7 +68,15 @@ class MultiMutableVamanaIndex; /// /// Only used for `MutableVamanaIndex`. /// -enum class SlotMetadata : uint8_t { Empty = 0x00, Valid = 0x01, Deleted = 0x02 }; +enum class SlotMetadata : uint8_t { + Empty = 0x00, + Valid = 0x01, + Deleted = 0x02, + // Reserved by an in-flight add_points: slot owned by the adder, vector + // copied, adjacency list being built. Invisible to search, consolidate, + // and subsequent add_points until promoted to Valid. + Pending = 0x04, +}; template inline constexpr std::string_view name(); template <> inline constexpr std::string_view name() { @@ -76,6 +88,9 @@ template <> inline constexpr std::string_view name() { template <> inline constexpr std::string_view name() { return "Deleted"; } +template <> inline constexpr std::string_view name() { + return "Pending"; +} // clang-format off inline constexpr std::string_view name(SlotMetadata metadata) { @@ -84,6 +99,7 @@ inline constexpr std::string_view name(SlotMetadata metadata) { SVS_SWITCH_RETURN(SlotMetadata::Empty) SVS_SWITCH_RETURN(SlotMetadata::Valid) SVS_SWITCH_RETURN(SlotMetadata::Deleted) + SVS_SWITCH_RETURN(SlotMetadata::Pending) } #undef SVS_SWITCH_RETURN throw ANNEXCEPTION("Unreachable!"); @@ -92,19 +108,25 @@ inline constexpr std::string_view name(SlotMetadata metadata) { class ValidBuilder { public: - ValidBuilder(const std::vector& status) + ValidBuilder(const lib::SegmentedVector& status) : status_{status} {} template constexpr PredicatedSearchNeighbor operator()(I i, float distance) const { - bool invalid = getindex(status_, i) == SlotMetadata::Deleted; + // A neighbor is returnable only if its slot is Valid. Deleted slots + // must be skipped; Pending slots are reserved by an in-flight add and + // their vectors/edges are not yet fully published. Empty slots should + // never be reached via a valid edge, but we defend anyway. + bool invalid = + std::atomic_ref(const_cast(getindex(status_, i))) + .load(std::memory_order_acquire) != SlotMetadata::Valid; // This neighbor should be skipped if the metadata corresponding to the given index // marks this slot as deleted. return PredicatedSearchNeighbor(i, distance, !invalid); } private: - const std::vector& status_; + const lib::SegmentedVector& status_; }; template @@ -150,9 +172,39 @@ class MutableVamanaIndex { graph_type graph_; data_type data_; entry_point_type entry_point_; - std::vector status_; + // Grow-stable per-slot metadata: search reads status_[i] lock-free via ValidBuilder + // while a concurrent add_points grows it. See svs/lib/segmented_vector.h. + lib::SegmentedVector status_; size_t first_empty_ = 0; IDTranslator translator_; + // Count of Valid slots. Maintained atomically in add_points/delete_entry. + // Wrapped in unique_ptr because std::atomic is not movable. + std::unique_ptr> num_valid_{ + std::make_unique>(0)}; + // Protects translator access: exclusive for writes (add/consolidate/compact), + // shared for reads (delete/search). Wrapped in unique_ptr for movability. + std::unique_ptr translator_mutex_{ + std::make_unique()}; + // Reserves slot ownership against compact(). Search and the other readers + // (get_distance/reconstruct_at/batch-iterator) hold this shared so that + // compact()'s shrink — which frees trailing segments of the grow-stable + // containers — drains in-flight readers before destroying storage. + // Writers (add_points, delete_entries, consolidate) also hold it shared; + // compact() holds it exclusive. + // + // Lock acquisition order across the codebase: + // compact_mutex_ -> slot_alloc_mutex_ (never reversed) + // compact_mutex_ -> translator_mutex_ (never reversed) + // slot_alloc_mutex_ and translator_mutex_ are never held simultaneously + // (add_points takes them sequentially), so they have no relative order. + std::unique_ptr compact_mutex_{ + std::make_unique()}; + // Writer-only mutex serializing slot allocation in add_points: the Empty-slot + // scan, container growth, Pending stamp, and first_empty_ update. No reader + // ever takes it, so the O(status_.size()) scan does not block delete/search + // translator readers. Held only by add_points (and never together with + // translator_mutex_). + std::unique_ptr slot_alloc_mutex_{std::make_unique()}; // Thread local data structures. distance_type distance_; @@ -192,6 +244,7 @@ class MutableVamanaIndex { , status_(data_.size(), SlotMetadata::Valid) , first_empty_{data_.size()} , translator_() + , num_valid_{std::make_unique>(data_.size())} , distance_{std::move(distance_function)} , threadpool_{threads::as_threadpool(std::move(threadpool_proto))} , search_parameters_{vamana::construct_default_search_parameters(data_)} @@ -219,6 +272,7 @@ class MutableVamanaIndex { , status_(data_.size(), SlotMetadata::Valid) , first_empty_{data_.size()} , translator_() + , num_valid_{std::make_unique>(data_.size())} , distance_(std::move(distance_function)) , threadpool_(threads::as_threadpool(std::move(threadpool_proto))) , search_parameters_(vamana::construct_default_search_parameters(data_)) @@ -285,6 +339,7 @@ class MutableVamanaIndex { , status_{data_.size(), SlotMetadata::Valid} , first_empty_{data_.size()} , translator_{std::move(translator)} + , num_valid_{std::make_unique>(data_.size())} , distance_{distance_function} , threadpool_{std::move(threadpool)} , search_parameters_{config.search_parameters} @@ -357,10 +412,27 @@ class MutableVamanaIndex { /// Idx translate_external_id(size_t e) const { return translator_.get_internal(e); } + /// @brief Translate external ID, returning `default_val` if not mapped. + /// + /// Unlike `translate_external_id`, this does not throw on a missing key. + /// Intended for best-effort readers (e.g. search buffer top-up) that may + /// race with `consolidate()` erasing translator entries. + Idx translate_external_id_or(size_t e, Idx default_val) const { + return translator_.get_internal_or(e, default_val); + } + /// /// @brief Check whether the external ID `e` exists in the index. /// - bool has_id(size_t e) const { return translator_.has_external(e); } + bool has_id(size_t e) const { + if (!translator_.has_external(e)) { + return false; + } + // Check slot is not Deleted (deferred translator cleanup). + auto internal = translator_.get_internal(e); + return std::atomic_ref(const_cast(status_[internal])) + .load(std::memory_order_acquire) == SlotMetadata::Valid; + } /// /// @brief Get the external ID mapped to be `i`. @@ -369,7 +441,11 @@ class MutableVamanaIndex { /// /// Requires that mapping for `i` exists. Otherwise, all bets are off. /// - size_t translate_internal_id(Idx i) const { return translator_.get_external(i); } + size_t translate_internal_id(Idx i) const { + // Use get_external_or to handle concurrent consolidate erasing entries. + // If the entry was erased, return the internal ID as-is (stale result). + return translator_.get_external_or(i, static_cast(i)); + } /// /// @brief Call the functor with all external IDs in the index. @@ -378,8 +454,13 @@ class MutableVamanaIndex { /// each external ID in the index. /// template void on_ids(F&& f) const { + // Skip entries whose slot is Deleted (deferred translator cleanup). for (auto pair : translator_) { - f(pair.first); + auto internal = pair.second; + if (std::atomic_ref(const_cast(status_[internal])) + .load(std::memory_order_acquire) == SlotMetadata::Valid) { + f(pair.first); + } } } @@ -393,11 +474,7 @@ class MutableVamanaIndex { } /// @brief Return the number of **valid** (non-deleted) entries in the index. - size_t size() const { - // NB: Index translation should always be kept in-sync with the number of valid - // elements. - return translator_.size(); - } + size_t size() const { return num_valid_->load(std::memory_order_acquire); } /// /// @brief Translate in-place a collection of internal IDs to external IDs. @@ -421,7 +498,7 @@ class MutableVamanaIndex { template requires(std::tuple_size_v == 2) void translate_to_external(DenseArray& ids) { - // N.B.: lib::narrow_cast should be valid because the origin of the IDs is internal. + std::shared_lock lock{*translator_mutex_}; threads::parallel_for( threadpool_, threads::StaticPartition{getsize<0>(ids)}, @@ -439,7 +516,15 @@ class MutableVamanaIndex { /// /// @brief Get the raw data for external id `e`. /// - auto get_datum(size_t e) const { return data_.get_datum(translate_external_id(e)); } + auto get_datum(size_t e) const { + // Lock order: compact_mutex_ then translator_mutex_ (global order). + std::shared_lock compact_lock{*compact_mutex_}; + std::shared_lock lock{*translator_mutex_}; + if (!translator_.has_external(e)) { + throw ANNEXCEPTION("External ID {} not found in index!", e); + } + return data_.get_datum(translator_.get_internal(e)); + } /// /// @brief Return the dimensionality of the stored dataset. @@ -453,6 +538,26 @@ class MutableVamanaIndex { // This is an internal method, mostly used to help implement the batch iterator. ValidBuilder internal_search_builder() const { return ValidBuilder{status_}; } + /// @brief RAII reader lock guarding data_/graph_ against compact()'s shrink + /// (which frees segments). Used by BatchIterator::next() to protect the + /// greedy traversal — mirrors the shared lock taken by search(). Growth by + /// add_points needs no lock (grow-stable SegmentedVector storage). + /// + /// Acquire this only around graph traversal, and release it before + /// acquiring lock_for_translation(): the two must never be held nested in + /// the compact->translator order reversed, which would invert the global + /// lock order (compact -> translator) and deadlock against compact. + [[nodiscard]] std::shared_lock lock_for_search() const { + return std::shared_lock(*compact_mutex_); + } + + /// @brief RAII reader lock guarding translator_ against erase/remap by + /// consolidate/compact. Used by BatchIterator::next() to protect + /// internal->external ID translation. + [[nodiscard]] std::shared_lock lock_for_translation() const { + return std::shared_lock(*translator_mutex_); + } + auto greedy_search_closure( GreedySearchPrefetchParameters prefetch_parameters, const lib::DefaultPredicate& cancel = lib::Returns(lib::Const()) @@ -486,6 +591,9 @@ class MutableVamanaIndex { scratchspace_type& scratch, const lib::DefaultPredicate& cancel = lib::Returns(lib::Const()) ) const { + // Hold compact_mutex_ shared so compact()'s shrink can't free segments + // mid-traversal. add_points growth is lock-free (grow-stable storage). + std::shared_lock compact_lock{*compact_mutex_}; extensions::single_search( data_, scratch.buffer, @@ -503,36 +611,43 @@ class MutableVamanaIndex { const search_parameters_type& sp, const lib::DefaultPredicate& cancel = lib::Returns(lib::Const()) ) { - threads::parallel_for( - threadpool_, - threads::StaticPartition{queries.size()}, - [&](const auto is, uint64_t SVS_UNUSED(tid)) { - size_t num_neighbors = results.n_neighbors(); - auto buffer = - search_buffer_type{sp.buffer_config_, distance::comparator(distance_)}; - - auto prefetch_parameters = GreedySearchPrefetchParameters{ - sp.prefetch_lookahead_, sp.prefetch_step_}; - - // Legalize search buffer for this search. - if (buffer.target_capacity() < num_neighbors) { - buffer.change_maxsize(num_neighbors); + { + // compact_mutex_ shared: blocks compact()'s segment-freeing shrink + // during the traversal. Released before translate_to_external() takes + // translator_mutex_ to keep the compact->translator lock order. + std::shared_lock compact_lock{*compact_mutex_}; + threads::parallel_for( + threadpool_, + threads::StaticPartition{queries.size()}, + [&](const auto is, uint64_t SVS_UNUSED(tid)) { + size_t num_neighbors = results.n_neighbors(); + auto buffer = search_buffer_type{ + sp.buffer_config_, distance::comparator(distance_)}; + + auto prefetch_parameters = GreedySearchPrefetchParameters{ + sp.prefetch_lookahead_, sp.prefetch_step_}; + + // Legalize search buffer for this search. + if (buffer.target_capacity() < num_neighbors) { + buffer.change_maxsize(num_neighbors); + } + auto scratch = + extensions::per_thread_batch_search_setup(data_, distance_); + + extensions::per_thread_batch_search( + data_, + buffer, + scratch, + queries, + results, + threads::UnitRange{is}, + greedy_search_closure(prefetch_parameters, cancel), + *this, + cancel + ); } - auto scratch = extensions::per_thread_batch_search_setup(data_, distance_); - - extensions::per_thread_batch_search( - data_, - buffer, - scratch, - queries, - results, - threads::UnitRange{is}, - greedy_search_closure(prefetch_parameters, cancel), - *this, - cancel - ); - } - ); + ); + } // Check if request to cancel the search if (cancel()) { @@ -641,55 +756,99 @@ class MutableVamanaIndex { ); } - // Gather all empty slots. + // Reserve slot ownership against compact(). Held for the entire call, + // including the lock-free Phase 2-4 below; compact() takes this + // exclusive and so will block until every in-flight add finishes. + std::shared_lock compact_lock{*compact_mutex_}; + + // Phase 1: Slot allocation under slot_alloc_mutex_ (writer-only; does + // not block delete/search translation). Scan for Empty slots, grow if + // short, stamp the reserved slots Pending, and advance first_empty_. + // + // Order is load-bearing: publishing the reservation (Pending) + // before the mapping means a concurrent delete_entries(E) for an id + // still mid-add sees has_external(E)==false and correctly no-ops, + // rather than counting a delete that the Empty->Pending->Valid slot + // would silently swallow. std::vector slots{}; - slots.reserve(num_points); - bool have_room = false; - - size_t s = reuse_empty ? 0 : first_empty_; - size_t smax = status_.size(); - for (; s < smax; ++s) { - if (status_[s] == SlotMetadata::Empty) { - slots.push_back(s); + { + std::lock_guard lock{*slot_alloc_mutex_}; + + // Gather all empty slots. + slots.reserve(num_points); + bool have_room = false; + + size_t s = reuse_empty ? 0 : first_empty_; + size_t smax = status_.size(); + for (; s < smax; ++s) { + if (status_[s] == SlotMetadata::Empty) { + slots.push_back(s); + } + if (slots.size() == num_points) { + have_room = true; + break; + } + } + + // Check if we have enough indices. If we don't, we need to resize. + if (!have_room) { + size_t needed = num_points - slots.size(); + size_t current_size = data_.size(); + size_t new_size = current_size + needed; + // Grow lock-free w.r.t. readers: the containers are grow-stable + // SegmentedVectors, so a concurrent search subscripting an index + // < its observed size is unaffected by this append. Serialized + // against other writers by slot_alloc_mutex_ held across this block. + data_.resize(new_size); + graph_.unsafe_resize(new_size); + status_.resize(new_size, SlotMetadata::Empty); + + threads::UnitRange extra_points{ + current_size, current_size + needed}; + slots.insert(slots.end(), extra_points.begin(), extra_points.end()); } - if (slots.size() == num_points) { - have_room = true; - break; + assert(slots.size() == num_points); + + // Stamp the reserved slots as Pending. Pending signals "reserved by + // an in-flight add; do not touch" to concurrent consolidate, delete, + // and add. Once stamped, the copy_points/clear_lists below can run + // without the lock because (a) other writers' Empty-slot scans skip + // Pending, and (b) searches' ValidBuilder filters non-Valid slots + // from results. Promoted to Valid after VamanaBuilder::construct. + for (auto s : slots) { + std::atomic_ref(status_[s]) + .store(SlotMetadata::Pending, std::memory_order_release); } - } - // Check if we have enough indices. If we don't, we need to resize the data and - // the graph. - if (!have_room) { - size_t needed = num_points - slots.size(); - size_t current_size = data_.size(); - size_t new_size = current_size + needed; - data_.resize(new_size); - - // Graph resizing marked as un-safe because graph contain internal references - // and thus it's not a good idea to go around shrinking the graph without care. - // - // However, we are only growing here, so resizing will not change any - // invariants. - graph_.unsafe_resize(new_size); - status_.resize(new_size, SlotMetadata::Empty); - - // Append the correct number of extra slots. - threads::UnitRange extra_points{current_size, current_size + needed}; - slots.insert(slots.end(), extra_points.begin(), extra_points.end()); + if (!slots.empty()) { + first_empty_ = std::max(first_empty_, slots.back() + 1); + } } - assert(slots.size() == num_points); - // Try to update the id translation now that we have internal ids. - // If this fails, we still haven't mutated the index data structure so we're safe - // to throw an exception. - translator_.insert(external_ids, slots); + // Phase 2: Publish the id translation under translator_mutex_ exclusive + // A Pending slot belongs to an in-flight adder and must + // not be treated as stale — that would clobber the other adder's mapping. + { + std::lock_guard lock{*translator_mutex_}; + translator_ + .replace_stale_and_insert(external_ids, slots, [this](auto internal) { + return std::atomic_ref( + const_cast(status_[internal]) + ) + .load(std::memory_order_acquire) == SlotMetadata::Deleted; + }); + } - // Copy the given points into the data and clear the adjacency lists for the graph. + // Phase 3: Lock-free data copy and adjacency clearing. + // Slots are Pending: invisible to search (ValidBuilder filters), + // reserved against other writers (Empty-slot scan skips Pending). copy_points(points, slots); clear_lists(slots); - // Patch in the new neighbors. + // Phase 4: Graph construction — runs without lock. + // VamanaBuilder::construct() is thread-safe via per-node spinlock+seqlock. + // note: VamanaBuilder constructor asserts graph_.n_nodes() == data_.size(). + // Both are grown together under the lock above, so this is always consistent. auto parameters = VamanaBuildParameters{ alpha_, graph_.max_degree(), @@ -711,14 +870,14 @@ class MutableVamanaIndex { logger_, logging::Level::Trace}; builder.construct(alpha_, entry_point(), slots, logging::Level::Trace, logger_); - // Mark all added entries as valid. + + // Mark added entries as valid (unique slots per thread, no lock needed). for (const auto& i : slots) { - status_[i] = SlotMetadata::Valid; + std::atomic_ref(status_[i]) + .store(SlotMetadata::Valid, std::memory_order_release); } + num_valid_->fetch_add(slots.size(), std::memory_order_acq_rel); - if (!slots.empty()) { - first_empty_ = std::max(first_empty_, slots.back() + 1); - } return slots; } @@ -745,21 +904,58 @@ class MutableVamanaIndex { /// graph. /// template size_t delete_entries(const T& ids) { - translator_.check_external_exist(ids.begin(), ids.end()); + std::shared_lock compact_lock{*compact_mutex_}; + std::shared_lock lock{*translator_mutex_}; + size_t deleted = 0; for (auto i : ids) { + if (!translator_.has_external(i)) { + continue; // Already deleted + consolidated, or never existed. + } delete_entry(translator_.get_internal(i)); + ++deleted; } - translator_.delete_external(ids); - return ids.size(); + // Don't erase translator entries here — concurrent search may still + // need them for translate_to_external(). Cleanup happens in + // consolidate()/compact() when deleted slots become empty. + return deleted; } void delete_entry(size_t i) { - SlotMetadata& meta = getindex(status_, i); - assert(meta == SlotMetadata::Valid); - meta = SlotMetadata::Deleted; + auto& meta = getindex(status_, i); + auto ref = std::atomic_ref(meta); + // CAS Valid → Deleted. If the slot is Pending (concurrent adder still + // in phase 2), wait for the adder to promote it to Valid before we + // can soft-delete; otherwise the delete would be silently lost. Only + // the thread that successfully transitions decrements num_valid_; + // double-deletes silently no-op. + for (;;) { + SlotMetadata expected = SlotMetadata::Valid; + if (ref.compare_exchange_strong( + expected, + SlotMetadata::Deleted, + std::memory_order_acq_rel, + std::memory_order_relaxed + )) { + num_valid_->fetch_sub(1, std::memory_order_acq_rel); + return; + } + if (expected != SlotMetadata::Pending) { + // Already Deleted or Empty — no-op. + return; + } + // Pending: adder's Pending → Valid store is imminent; spin. + svs::detail::pause(); + } } - bool is_deleted(size_t i) const { return status_[i] != SlotMetadata::Valid; } + bool is_deleted(size_t i) const { + // True only for slots that have been soft-deleted. Pending (in-flight + // add) and Empty are NOT deleted: consolidate must not prune them out + // of other nodes' adjacency lists, and search already filters + // non-Valid slots via ValidBuilder. + return std::atomic_ref(const_cast(status_[i])) + .load(std::memory_order_acquire) == SlotMetadata::Deleted; + } Idx entry_point() const { assert(entry_point_.size() == 1); @@ -767,15 +963,21 @@ class MutableVamanaIndex { } /// - /// @brief Return all the non-missing internal IDs. - /// - /// This includes both valid and soft-deleted entries. + /// @brief Return all internal IDs whose slot is Valid (live). /// + /// Used by compact() to pick the surviving set. Pending slots (in-flight + /// adds) are excluded — compact is only safe to run when the caller has + /// ensured no Pending slots exist (compact holds translator_mutex_ + /// exclusive, which prevents a new add from entering phase 1, but an add + /// that reached phase 2 before compact grabbed the lock may still be + /// publishing status Pending → Valid; the compact caller must quiesce + /// these adds first). std::vector nonmissing_indices() const { auto indices = std::vector(); indices.reserve(size()); for (size_t i = 0, imax = status_.size(); i < imax; ++i) { - if (!is_deleted(i)) { + if (std::atomic_ref(const_cast(status_[i])) + .load(std::memory_order_acquire) == SlotMetadata::Valid) { indices.push_back(i); } } @@ -789,6 +991,14 @@ class MutableVamanaIndex { /// improve performance but requires more working memory. /// void compact(Idx batch_size = 1'000) { + // Exclusive against all writers (add_points/delete_entries/consolidate). + // shared_mutex semantics ensure that by the time we acquire the lock, + // every prior writer has released its shared lock — including + // add_points's lock-free Phase 2-4. After acquisition, no new writer + // can start until we release. Search does not take this mutex and + // continues to run. + std::lock_guard compact_lock{*compact_mutex_}; + // Step 1: Compute a prefix-sum matching each valid internal index to its new // internal index. // @@ -820,6 +1030,8 @@ class MutableVamanaIndex { auto this_batch = batch_to_new_id_map.eachindex(); // Copy the graph into the temporary buffer and remap the IDs. + // Edges to non-Valid (Deleted) slots are dropped — those slots + // do not survive compaction, so the edge would dangle. threads::parallel_for( threadpool_, threads::StaticPartition(this_batch), @@ -830,17 +1042,15 @@ class MutableVamanaIndex { auto old_id = new_to_old_id_map[new_id]; const auto& list = graph_.get_node(old_id); - buffer.resize(list.size()); - - // Transform the adjacency list from old to new. - std::transform( - list.begin(), - list.end(), - buffer.begin(), - [&old_to_new_id_map](Idx old_id) { - return old_to_new_id_map.at(old_id); + buffer.clear(); + buffer.reserve(list.size()); + + for (auto neighbor_old : list) { + auto it = old_to_new_id_map.find(neighbor_old); + if (it != old_to_new_id_map.end()) { + buffer.push_back(it->second); } - ); + } temp_graph.replace_node(batch_id, buffer); } @@ -862,30 +1072,42 @@ class MutableVamanaIndex { } ///// Finishing steps. - // Resize the graph and data. - graph_.unsafe_resize(max_index); - data_.resize(max_index); - first_empty_ = max_index; - - // Compact metadata and ID remapping. - for (size_t new_id = 0; new_id < max_index; ++new_id) { - auto old_id = getindex(new_to_old_id_map, new_id); - // No work to be done if there was no remapping. - if (new_id == old_id) { - continue; - } + { + std::lock_guard lock{*translator_mutex_}; + // Shrink the graph and data. compact_mutex_ is held exclusive for the + // whole compact(), so all in-flight readers have drained — freeing + // trailing segments here cannot dangle a concurrent search. + graph_.unsafe_resize(max_index); + data_.resize(max_index); + first_empty_ = max_index; + + // Compact metadata and ID remapping. + for (size_t new_id = 0; new_id < max_index; ++new_id) { + auto old_id = getindex(new_to_old_id_map, new_id); + if (new_id == old_id) { + continue; + } - auto status = getindex(status_, old_id); - status_[new_id] = status; - if (status == SlotMetadata::Valid) { - translator_.remap_internal_id(old_id, new_id); + auto status = getindex(status_, old_id); + status_[new_id] = status; + if (status == SlotMetadata::Valid) { + translator_.remap_internal_id(old_id, new_id); + } + } + status_.resize(max_index); + + // Update entry points. If an entry point is no longer present + // (e.g. it was Deleted prior to compact), fall back to internal + // ID 0 — by construction max_index > 0 implies a survivor. + for (auto& ep : entry_point_) { + auto it = old_to_new_id_map.find(ep); + if (it != old_to_new_id_map.end()) { + ep = it->second; + } else { + assert(max_index > 0); + ep = 0; + } } - } - status_.resize(max_index); - - // Update entry points. - for (auto& ep : entry_point_) { - ep = old_to_new_id_map.at(ep); } } @@ -948,6 +1170,7 @@ class MutableVamanaIndex { ///// Mutation void consolidate() { + std::shared_lock compact_lock{*compact_mutex_}; auto check_is_deleted = [&](size_t i) { return this->is_deleted(i); }; std::function valid = [&](size_t i) { return !(this->is_deleted(i)); @@ -978,10 +1201,26 @@ class MutableVamanaIndex { check_is_deleted ); - // After consolidation - set all `Deleted` slots to `Empty`. - for (auto& status : status_) { - if (status == SlotMetadata::Deleted) { - status = SlotMetadata::Empty; + // After consolidation - clean up deleted slots under lock. + { + std::lock_guard lock{*translator_mutex_}; + // Erase translator entries for deleted slots (deferred from delete_entries). + // Skip entries already absent — add_points with replace_stale_and_insert + // may have reassigned the external ID and erased the stale reverse entry. + std::vector deleted_internal_ids; + for (size_t i = 0, imax = status_.size(); i < imax; ++i) { + if (status_[i] == SlotMetadata::Deleted && translator_.has_internal(i)) { + deleted_internal_ids.push_back(i); + } + } + if (!deleted_internal_ids.empty()) { + translator_.delete_internal(deleted_internal_ids, false); + } + // Set all `Deleted` slots to `Empty`. + for (size_t i = 0, imax = status_.size(); i < imax; ++i) { + if (status_[i] == SlotMetadata::Deleted) { + status_[i] = SlotMetadata::Empty; + } } } } @@ -1140,6 +1379,12 @@ class MutableVamanaIndex { ); } + // Lock order: compact_mutex_ then translator_mutex_ (global order). + // compact_mutex_ shared guards data_/graph_ against compact()'s shrink; + // translator_mutex_ shared guards the ID translation reads below. + std::shared_lock compact_lock{*compact_mutex_}; + std::shared_lock lock{*translator_mutex_}; + // Bounds checking. for (size_t i = 0; i < ids_size; ++i) { I id = ids[i]; // inbounds by loop bounds. @@ -1239,6 +1484,11 @@ class MutableVamanaIndex { case SlotMetadata::Empty: { return false; } + case SlotMetadata::Pending: { + // In-flight add: edges may be only partially published. + // Treat as not-yet-live for consistency checking. + return false; + } } // Make GCC happy. return false; @@ -1271,6 +1521,12 @@ class MutableVamanaIndex { /// @brief Compute the distance between an external vector and a vector in the index. template double get_distance(const ExternalId& external_id, const Query& query) const { + // Lock order: compact_mutex_ then translator_mutex_ (global order). + // compact_mutex_ shared guards data_ against compact()'s shrink; + // translator_mutex_ shared guards the ID translation read. + std::shared_lock compact_lock{*compact_mutex_}; + std::shared_lock lock{*translator_mutex_}; + // Check if the external ID exists if (!has_id(external_id)) { throw ANNEXCEPTION( diff --git a/include/svs/index/vamana/extensions.h b/include/svs/index/vamana/extensions.h index 96ffeaa06..6854cea72 100644 --- a/include/svs/index/vamana/extensions.h +++ b/include/svs/index/vamana/extensions.h @@ -27,6 +27,8 @@ #include "svs/lib/preprocessor.h" #include "svs/lib/threads.h" +#include + namespace svs::index::vamana::extensions { ///// @@ -437,8 +439,16 @@ void check_and_supplement_search_buffer( if (search_buffer.valid() < search_buffer.target_window() && search_buffer.valid() < index.size()) { search_buffer.sort(); + // Snapshot of external IDs may contain entries that a concurrent + // `consolidate()` erases before we reach them. Use the non-throwing + // `_or` lookup and skip any ID that's no longer mapped. + using IdT = decltype(index.translate_external_id(0)); + constexpr IdT kMissing = std::numeric_limits::max(); for (auto external_id : index.external_ids()) { - auto internal_id = index.translate_external_id(external_id); + auto internal_id = index.translate_external_id_or(external_id, kMissing); + if (internal_id == kMissing) { + continue; + } auto dist = index.get_distance(external_id, query); auto builder = index.internal_search_builder(); search_buffer.insert(builder(internal_id, dist)); diff --git a/include/svs/index/vamana/greedy_search.h b/include/svs/index/vamana/greedy_search.h index f12c0129f..ac91331fe 100644 --- a/include/svs/index/vamana/greedy_search.h +++ b/include/svs/index/vamana/greedy_search.h @@ -20,6 +20,8 @@ #include "svs/concepts/distance.h" #include "svs/concepts/graph.h" #include "svs/index/vamana/search_buffer.h" +#include "svs/lib/concurrency/seqlock.h" +#include "svs/lib/spinlock.h" #include #include @@ -159,45 +161,62 @@ void greedy_search( const auto& node = search_buffer.next(); auto node_id = node.id(); - // Get the adjacency list for this vertex and prepare prefetching logic. - auto neighbors = graph.get_node(node_id); - const size_t num_neighbors = neighbors.size(); - search_tracker.visited(Neighbor{node}, neighbors.size()); - - auto prefetcher = lib::make_prefetcher( - lib::PrefetchParameters{ - prefetch_parameters.lookahead, prefetch_parameters.step}, - num_neighbors, - [&](size_t i) { accessor.prefetch(dataset, neighbors[i]); }, - [&](size_t i) { - // Perform the visited set enabled check just once. - if (search_buffer.visited_set_enabled()) { - // Prefetch next bucket so it's (hopefully) in the cache when we next - // consult the visited filter. - if (i + 1 < num_neighbors) { - search_buffer.unsafe_prefetch_visited(neighbors[i + 1]); + for (;;) { // SeqLock retry loop + auto maybe_seq = graph.seq_counters()[node_id].read_begin(); + if (!maybe_seq) { + detail::pause(); + continue; + } + + // Get the adjacency list for this vertex and prepare prefetching logic. + auto neighbors = graph.get_node(node_id); + const size_t num_neighbors = neighbors.size(); + search_tracker.visited(Neighbor{node}, num_neighbors); + + auto prefetcher = lib::make_prefetcher( + lib::PrefetchParameters{ + prefetch_parameters.lookahead, prefetch_parameters.step}, + num_neighbors, + [&](size_t i) { accessor.prefetch(dataset, neighbors[i]); }, + [&](size_t i) { + // Perform the visited set enabled check just once. + if (search_buffer.visited_set_enabled()) { + // Prefetch next bucket so it's (hopefully) in the cache when + // we next consult the visited filter. + if (i + 1 < num_neighbors) { + search_buffer.unsafe_prefetch_visited(neighbors[i + 1]); + } + return !search_buffer.unsafe_is_visited(neighbors[i]); } - return !search_buffer.unsafe_is_visited(neighbors[i]); + + // Otherwise, always prefetch the next data item. + return true; } + ); - // Otherwise, always prefetch the next data item. - return true; - } - ); + ///// Neighbor expansion. + prefetcher(); + for (auto id : neighbors) { + if (search_buffer.emplace_visited(id)) { + continue; + } - ///// Neighbor expansion. - prefetcher(); - for (auto id : neighbors) { - if (search_buffer.emplace_visited(id)) { - continue; - } + // Run the prefetcher. + prefetcher(); - // Run the prefetcher. - prefetcher(); + // Compute distance and update search buffer. + auto dist = + distance::compute(distance_function, query, accessor(dataset, id)); + search_buffer.insert(builder(id, dist)); + } - // Compute distance and update search buffer. - auto dist = distance::compute(distance_function, query, accessor(dataset, id)); - search_buffer.insert(builder(id, dist)); + // Validate that no concurrent write occurred during the read. + if (graph.seq_counters()[node_id].read_validate(*maybe_seq)) { + break; // Consistent read — proceed to the next node. + } + detail::pause(); + // Retry: stale entries from the invalid read remain in the search buffer. + // They have valid IDs and distances, and insert() deduplicates by ID. } } } diff --git a/include/svs/index/vamana/index.h b/include/svs/index/vamana/index.h index 70a921353..556ae5486 100644 --- a/include/svs/index/vamana/index.h +++ b/include/svs/index/vamana/index.h @@ -465,6 +465,13 @@ class VamanaIndex { // This is an internal method, mostly used to help implement the batch iterator. static NeighborBuilder internal_search_builder() { return NeighborBuilder(); } + // The static index is immutable after construction, so the batch iterator + // needs no locking. These return an empty guard so BatchIterator::next() + // can take the locks uniformly across static and dynamic indexes + struct NullLockGuard {}; + [[nodiscard]] NullLockGuard lock_for_search() const { return {}; } + [[nodiscard]] NullLockGuard lock_for_translation() const { return {}; } + auto greedy_search_closure( GreedySearchPrefetchParameters prefetch_parameters, const lib::DefaultPredicate& cancel = lib::Returns(lib::Const()) diff --git a/include/svs/index/vamana/iterator.h b/include/svs/index/vamana/iterator.h index 44d76fb81..181ec90ba 100644 --- a/include/svs/index/vamana/iterator.h +++ b/include/svs/index/vamana/iterator.h @@ -257,61 +257,76 @@ template class BatchIterator { bool restart_search_copy = std::exchange(restart_search_, true); - parent_->experimental_escape_hatch([&]( - const auto& graph, - const auto& data, - const auto& SVS_UNUSED(distance), - std::span entry_points - ) { - auto search_closure = - [&](const auto& query, const auto& accessor, auto& d, auto& buffer) { - constexpr vamana::extensions::UsesReranking< - std::remove_const_t>> - uses_reranking{}; - if constexpr (uses_reranking()) { - distance::maybe_fix_argument(d, query); - for (size_t j = 0, jmax = buffer.size(); j < jmax; ++j) { - auto& neighbor = buffer[j]; - auto id = neighbor.id(); - auto new_distance = - distance::compute(d, query, data.get_primary(id)); - neighbor.set_distance(new_distance); + // Hold the search lock (compact_mutex_ shared for a dynamic index; a + // no-op for a static one) so a concurrent compact() cannot free the + // segments under us. add_points growth is lock-free (grow-stable + // storage). The guard is released at the end of this scope — before + // acquiring the translation lock. The two locks must never be held + // nested in the translator-before-compact order: that would invert the + // global lock order (compact -> translator) and deadlock against compact. + { + [[maybe_unused]] auto search_guard = parent_->lock_for_search(); + parent_->experimental_escape_hatch([&]( + const auto& graph, + const auto& data, + const auto& SVS_UNUSED(distance), + std::span entry_points + ) { + auto search_closure = + [&](const auto& query, const auto& accessor, auto& d, auto& buffer) { + constexpr vamana::extensions::UsesReranking< + std::remove_const_t>> + uses_reranking{}; + if constexpr (uses_reranking()) { + distance::maybe_fix_argument(d, query); + for (size_t j = 0, jmax = buffer.size(); j < jmax; ++j) { + auto& neighbor = buffer[j]; + auto id = neighbor.id(); + auto new_distance = + distance::compute(d, query, data.get_primary(id)); + neighbor.set_distance(new_distance); + } + buffer.sort(); } - buffer.sort(); - } - - vamana::greedy_search( - graph, - data, - accessor, - query, - d, - buffer, - RestartInitializer{entry_points, restart_search_copy}, - parent_->internal_search_builder(), - scratchspace_.prefetch_parameters, - cancel - ); - - if constexpr (Index::needs_id_translation) { - buffer.cleanup(); - buffer.sort(); - } - }; - - extensions::single_search( - data, - scratchspace_.buffer, - scratchspace_.scratch, - lib::as_const_span(query_), - search_closure, - *parent_ - ); - }); + + vamana::greedy_search( + graph, + data, + accessor, + query, + d, + buffer, + RestartInitializer{entry_points, restart_search_copy}, + parent_->internal_search_builder(), + scratchspace_.prefetch_parameters, + cancel + ); + + if constexpr (Index::needs_id_translation) { + buffer.cleanup(); + buffer.sort(); + } + }; + + extensions::single_search( + data, + scratchspace_.buffer, + scratchspace_.scratch, + lib::as_const_span(query_), + search_closure, + *parent_ + ); + }); + } ++iteration_; restart_search_ = false; - copy_from_scratch(batch_size); + // Hold the translation lock (translator_mutex_) so a + // concurrent consolidate/compact cannot erase or remap entries mid-copy. + { + [[maybe_unused]] auto translation_guard = parent_->lock_for_translation(); + copy_from_scratch(batch_size); + } // If result is empty after calling next(), mark the iterator as exhausted. // The iterator will not be able to find any more neighbors. if (results_.size() == 0 && batch_size > 0) { diff --git a/include/svs/index/vamana/vamana_build.h b/include/svs/index/vamana/vamana_build.h index 77d5ceadb..054473a11 100644 --- a/include/svs/index/vamana/vamana_build.h +++ b/include/svs/index/vamana/vamana_build.h @@ -131,6 +131,16 @@ template class BackedgeBuffer { void add_edge(Idx src, Idx dst) { // Get the bucket that the source vertex belongs to. size_t bucket = src / bucket_size_; + // The bucket array is sized once from the graph size snapshotted at + // VamanaBuilder construction. A concurrent add_points may have grown the + // graph past that snapshot, so a greedy-search neighbor `src` can fall + // beyond our bucket range. That node belongs to another in-flight add and + // gets its own back-edges from that add's builder; drop the overflow edge + // here rather than indexing out of bounds — consistent with the + // best-effort dropping already done on AddEdgeResult::Full. + if (bucket >= buckets_.size()) { + return; + } // Lock the bucket and update the adjacency list. std::lock_guard lock(bucket_locks_.at(bucket)); @@ -194,7 +204,6 @@ class VamanaBuilder { , params_{params} , prefetch_hint_{prefetch_hint} , threadpool_{threadpool} - , vertex_locks_(data.size()) , backedge_buffer_{data.size(), 1000} { // Print all parameters svs::logging::log( @@ -210,12 +219,10 @@ class VamanaBuilder { params.window_size, params.use_full_search_history ); - // Check class invariants. - if (graph_.n_nodes() != data_.size()) { - throw ANNEXCEPTION( - "Expected graph to be pre-allocated with {} vertices!", data_.size() - ); - } + // Note: graph/data size invariant (graph_.n_nodes() == data_.size()) is + // maintained under mutation_mutex_ in add_points(). During concurrent + // add_points() calls, sizes may temporarily differ between lock release + // and this point, but both are always >= the slots we will operate on. } void construct( @@ -494,10 +501,11 @@ class VamanaBuilder { [&](const auto& is, uint64_t SVS_UNUSED(tid)) { for (auto node_id : is) { for (auto other_id : graph_.get_node(node_id)) { - std::lock_guard lock{vertex_locks_[other_id]}; - if (graph_.get_node_degree(other_id) < params_.graph_max_degree) { - graph_.add_edge(other_id, node_id); - } else { + // graph_.add_edge is atomic under node_locks_[other_id]. + // If it reports Full, route to the overflow buffer — + // no TOCTOU race between a pre-check and the insert. + if (graph_.add_edge(other_id, node_id) == + graphs::AddEdgeResult::Full) { backedge_buffer_.add_edge(other_id, node_id); } } @@ -591,8 +599,6 @@ class VamanaBuilder { GreedySearchPrefetchParameters prefetch_hint_; /// Worker threadpool. Pool& threadpool_; - /// Per-vertex locks. - std::vector vertex_locks_; /// Overflow backedge buffer. BackedgeBuffer backedge_buffer_; }; diff --git a/include/svs/lib/concurrency/atomic_span.h b/include/svs/lib/concurrency/atomic_span.h new file mode 100644 index 000000000..4d7d4ae50 --- /dev/null +++ b/include/svs/lib/concurrency/atomic_span.h @@ -0,0 +1,93 @@ +/* + * Copyright 2026 Intel Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +namespace svs { + +/// +/// @brief A non-owning, zero-copy view over a contiguous range of ``T`` that performs +/// atomic loads on every element access. +/// +/// Each dereference uses ``std::atomic_ref::load(std::memory_order_relaxed)``. +/// On x86, this compiles to a plain MOV instruction — identical to non-atomic access. +/// +/// This type is designed to be used as a drop-in replacement for ``std::span`` +/// when concurrent reads and writes are possible, ensuring no undefined behavior +/// while maintaining zero-copy semantics. +/// +template class AtomicSpan { + public: + using value_type = std::remove_const_t; + + class iterator { + public: + using value_type = AtomicSpan::value_type; + using difference_type = std::ptrdiff_t; + using iterator_category = std::input_iterator_tag; + + explicit iterator(const T* p) + : ptr_(p) {} + + value_type operator*() const { + return std::atomic_ref(const_cast(*ptr_)) + .load(std::memory_order_relaxed); + } + + iterator& operator++() { + ++ptr_; + return *this; + } + + iterator operator++(int) { + auto tmp = *this; + ++ptr_; + return tmp; + } + + bool operator==(const iterator& other) const { return ptr_ == other.ptr_; } + bool operator!=(const iterator& other) const { return ptr_ != other.ptr_; } + + private: + const T* ptr_; + }; + + AtomicSpan(const T* data, size_t size) + : data_(data) + , size_(size) {} + + size_t size() const { return size_; } + bool empty() const { return size_ == 0; } + const T* data() const { return data_; } + + value_type operator[](size_t i) const { + return std::atomic_ref(const_cast(data_[i])) + .load(std::memory_order_relaxed); + } + + iterator begin() const { return iterator{data_}; } + iterator end() const { return iterator{data_ + size_}; } + + private: + const T* data_; + size_t size_; +}; + +} // namespace svs diff --git a/include/svs/lib/concurrency/seqlock.h b/include/svs/lib/concurrency/seqlock.h new file mode 100644 index 000000000..7fe3ea70e --- /dev/null +++ b/include/svs/lib/concurrency/seqlock.h @@ -0,0 +1,141 @@ +/* + * Copyright 2026 Intel Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "svs/lib/segmented_vector.h" + +#include +#include +#include +#include +#include + +namespace svs { + +/// +/// @brief Per-element sequence lock counter for reader-writer synchronization. +/// +/// Uses a uint8_t counter: odd values indicate a write in progress, even values indicate +/// a stable state. +/// +/// **Writer-writer serialization is the caller's responsibility.** Only one writer +/// at a time may call ``begin_write``/``end_write`` on a given counter. Use an external +/// lock (e.g., per-node ``SpinLock``) to serialize concurrent writers to the same element. +/// +class SeqLockCounter { + using counter_type = uint8_t; + + public: + SeqLockCounter() = default; + + SeqLockCounter(const SeqLockCounter& other) + : seq_(other.seq_.load(std::memory_order_relaxed)) {} + + SeqLockCounter& operator=(const SeqLockCounter& other) { + seq_.store(other.seq_.load(std::memory_order_relaxed), std::memory_order_relaxed); + return *this; + } + + SeqLockCounter(SeqLockCounter&& other) noexcept + : seq_(other.seq_.load(std::memory_order_relaxed)) {} + + SeqLockCounter& operator=(SeqLockCounter&& other) noexcept { + seq_.store(other.seq_.load(std::memory_order_relaxed), std::memory_order_relaxed); + return *this; + } + + /// + /// @brief Begin a write operation. Returns the pre-write sequence value. + /// + /// Increments the counter to an odd value, signaling to readers that a write is in + /// progress. The returned value must be passed to ``end_write``. + /// + counter_type begin_write() { + auto seq = seq_.load(std::memory_order_relaxed); + seq_.store(seq + 1, std::memory_order_relaxed); + std::atomic_thread_fence(std::memory_order_release); + return seq; + } + + /// + /// @brief End a write operation. + /// + /// @param seq The value returned by the corresponding ``begin_write`` call. + /// + /// Increments the counter to an even value, signaling that the write is complete + /// and data is consistent. + /// + void end_write(counter_type seq) { seq_.store(seq + 2, std::memory_order_release); } + + /// + /// @brief Begin a read operation. + /// + /// @returns The current sequence value if it is even (no write in progress), + /// or ``std::nullopt`` if a write is in progress. + /// + /// The returned value (if present) must be passed to ``read_validate`` after the + /// read is complete. + /// + std::optional read_begin() const { + auto seq = seq_.load(std::memory_order_acquire); + if (seq % 2 > 0) { + return std::nullopt; + } + return seq; + } + + /// + /// @brief Validate that no write occurred during the read. + /// + /// @param seq The value returned by ``read_begin``. + /// + /// @returns ``true`` if the data read between ``read_begin`` and ``read_validate`` + /// is consistent (no concurrent write occurred). + /// + bool read_validate(counter_type seq) const { + std::atomic_thread_fence(std::memory_order_acquire); + return seq_.load(std::memory_order_relaxed) == seq; + } + + private: + std::atomic seq_{0}; +}; + +/// +/// @brief Array of SeqLock counters, one per element (e.g., one per graph node). +/// +class SeqLockArray { + public: + SeqLockArray() = default; + explicit SeqLockArray(size_t n) + : counters_(n) {} + + SeqLockCounter& operator[](size_t i) { return counters_[i]; } + const SeqLockCounter& operator[](size_t i) const { return counters_[i]; } + + void resize(size_t n) { counters_.resize(n); } + size_t size() const { return counters_.size(); } + size_t capacity() const { return counters_.capacity(); } + + private: + // Grow-stable storage: appending counters never relocates existing ones, so a + // concurrent lock-free reader (greedy_search reading seq_counters_[i]) is safe + // against a writer's grow. See svs/lib/segmented_vector.h. + lib::SegmentedVector counters_; +}; + +} // namespace svs diff --git a/include/svs/lib/segmented_vector.h b/include/svs/lib/segmented_vector.h new file mode 100644 index 000000000..7f642ae9c --- /dev/null +++ b/include/svs/lib/segmented_vector.h @@ -0,0 +1,330 @@ +/* + * Copyright 2026 Intel Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "svs/lib/boundscheck.h" + +#include +#include +#include +#include +#include +#include +#include + +namespace svs::lib { + +/// +/// @brief An unbounded, grow-stable vector for single-writer/many-reader use. +/// +/// Two-level "lock-free dynamic array" (Dechev et al.). A fixed top-level directory of +/// ``kDirBuckets`` bucket pointers; directory bucket ``k`` is a single contiguous +/// heap array of ``kFirstBucket << k`` elements (the first bucket holds ``kFirstBucket``, +/// each subsequent bucket doubles). Grouping elements into chunks of ``kFirstBucket`` and +/// applying the power-of-two layout to the chunk index gives: ``q = i / kFirstBucket``, +/// ``bucket = floor(log2(q + 1))``, with the bucket's first global index +/// ``kFirstBucket * (2^bucket - 1)``. Sixty-four buckets address far more than any real +/// dataset, so there is no practical size cap. +/// +/// The directory array is a fixed member (never relocates), and each bucket array is +/// allocated once and never moved or reallocated. Therefore the address of any element +/// ``i < size()`` is stable for the lifetime of that element — a concurrent reader +/// indexing element ``i`` is unaffected by appends that grow the structure past ``i``. +/// +/// Concurrency contract: +/// * **One writer at a time.** ``resize`` / ``push_back`` / ``pop_back`` / ``shrink_to`` +/// (the only operations that change the structure) must be serialized by the caller +/// (e.g. under a mutex). +/// * **Many concurrent readers.** ``operator[]`` and ``size`` may run concurrently with a +/// writer's *grow*: a new bucket is allocated and its elements constructed, the bucket +/// pointer is published with a release store, and ``size_`` is bumped last (release). A +/// reader does an acquire load of ``size_`` then an acquire load of the bucket pointer, +/// so for any ``i < size()`` it observes, the bucket and element are fully published. +/// * **Shrink frees storage.** ``shrink_to`` destroys trailing elements and frees buckets +/// that lie entirely above the new size; a reader holding a reference to a freed element +/// would dangle, so the caller must drain readers (e.g. via an exclusive lock) first. +/// +/// This mirrors the std::vector subset used by the dynamic Vamana index: ``operator[]``, +/// ``at``, ``size``, ``empty``, ``capacity``, ``resize(n)``, ``resize(n, fill)``, +/// ``push_back``, ``pop_back``, ``shrink_to(n)``. +/// +template class SegmentedVector { + static constexpr std::size_t kDirBuckets = 64; + // Number of elements in the first directory bucket. Bucket ``k`` then holds + // ``kFirstBucket << k`` elements, so a small first bucket means many tiny + // allocations near the start while a large one front-loads capacity. + static constexpr std::size_t kFirstBucket = 1; + static_assert( + (kFirstBucket & (kFirstBucket - 1)) == 0, "kFirstBucket must be a power of two" + ); + + public: + using value_type = T; + using size_type = std::size_t; + using reference = T&; + using const_reference = const T&; + + SegmentedVector() = default; + explicit SegmentedVector(size_type n) { resize(n); } + SegmentedVector(size_type n, const T& fill) { resize(n, fill); } + + SegmentedVector(const SegmentedVector& other) { copy_from_(other); } + SegmentedVector& operator=(const SegmentedVector& other) { + if (this != &other) { + destroy_all_(); + copy_from_(other); + } + return *this; + } + + SegmentedVector(SegmentedVector&& other) noexcept { steal_from_(other); } + SegmentedVector& operator=(SegmentedVector&& other) noexcept { + if (this != &other) { + destroy_all_(); + steal_from_(other); + } + return *this; + } + + ~SegmentedVector() { destroy_all_(); } + + /// + /// @brief Access element ``i``. Precondition: ``i < size()``. + /// + /// Safe to call concurrently with a writer's grow ``resize``/``push_back``, provided + /// ``i`` was ``< size()`` as observed by the reader. + /// + const_reference operator[](size_type i) const noexcept { + auto [b, off] = locate_(i); + return dir_[b].load(std::memory_order_acquire)[off]; + } + reference operator[](size_type i) noexcept { + const auto& self = *this; + return const_cast(self[i]); + } + + /// @brief Bounds-checked access (used by ``svs::getindex`` when bounds checking is on). + reference at(size_type i) { + if (i >= size()) { + throw std::out_of_range("SegmentedVector::at index out of range"); + } + return (*this)[i]; + } + const_reference at(size_type i) const { + if (i >= size()) { + throw std::out_of_range("SegmentedVector::at index out of range"); + } + return (*this)[i]; + } + + size_type size() const noexcept { return size_.load(std::memory_order_acquire); } + bool empty() const noexcept { return size() == 0; } + + /// @brief Logical capacity: number of elements addressable without allocating a new + /// bucket. With ``m`` buckets this is ``kFirstBucket * (2^m - 1)``. + size_type capacity() const noexcept { return bucket_first_index_(allocated_buckets_); } + + /// @brief Grow or shrink the logical size. New elements are default-constructed. + /// Single-writer; concurrent readers safe on grow (see class contract). + void resize(size_type n) { resize_impl_(n, nullptr); } + + /// @brief Grow or shrink the logical size, filling new elements with ``fill``. + void resize(size_type n, const T& fill) { resize_impl_(n, &fill); } + + /// @brief Append one element, move-*constructing* it into the new slot. + /// + /// Grows logical size by one, allocating a new bucket if needed. The element is + /// constructed in place via T's move constructor, so types whose move-*assignment* is + /// unavailable or expensive (e.g. DenseArray, whose move-assign compares allocators) + /// still work. The element is published (bucket pointer first, then ``size_``) so a + /// concurrent reader that observes the new ``size()`` sees the constructed value. + /// Single-writer. + void push_back(T&& value) { + size_type i = size_.load(std::memory_order_relaxed); + auto [b, off] = locate_(i); + T* bucket = ensure_bucket_(b); + new (&bucket[off]) T(std::move(value)); + size_.store(i + 1, std::memory_order_release); + } + + /// @brief Drop the last element, destroying it (logical only; does not free the + /// bucket). Single-writer; caller must have drained readers if a bucket is later freed. + void pop_back() { + size_type i = size_.load(std::memory_order_relaxed); + if (i > 0) { + auto [b, off] = locate_(i - 1); + dir_[b].load(std::memory_order_relaxed)[off].~T(); + size_.store(i - 1, std::memory_order_release); + } + } + + /// @brief Shrink to ``n`` elements, destroying the dropped elements and freeing buckets + /// that lie entirely above ``n``. Single-writer; caller must have drained readers. + void shrink_to(size_type n) { + size_type old = size_.load(std::memory_order_relaxed); + if (n >= old) { + return; + } + // Stop readers from seeing the elements about to be destroyed. + size_.store(n, std::memory_order_release); + destroy_range_(n, old); + free_buckets_above_(n); + } + + private: + // Fixed top-level directory. Bucket k (when non-null) is a contiguous heap array of + // (kFirstBucket << k) elements; the elements with global index < size_ are constructed. + std::atomic dir_[kDirBuckets] = {}; + std::atomic size_{0}; + size_type allocated_buckets_{0}; + + // Number of elements held by bucket ``b`` (= kFirstBucket << b). + static constexpr size_type bucket_size_(size_type b) noexcept { + return kFirstBucket << b; + } + + // First global element index held by bucket ``b`` (= kFirstBucket * (2^b - 1)). + static constexpr size_type bucket_first_index_(size_type b) noexcept { + return kFirstBucket * ((size_type{1} << b) - 1); + } + + // Map element index ``i`` to (bucket, offset-within-bucket). Group elements into + // chunks of kFirstBucket, then apply the power-of-two bucket layout to the chunk + // index: q = i / kFirstBucket; bucket = floor(log2(q+1)); the bucket's first global + // index is kFirstBucket * (2^bucket - 1). + static constexpr std::pair locate_(size_type i) noexcept { + size_type q = i / kFirstBucket; + size_type bucket = static_cast(std::bit_width(q + 1)) - 1; + return {bucket, i - bucket_first_index_(bucket)}; + } + + // Ensure bucket ``b`` is allocated (single-writer) and return its base pointer. The + // bucket's elements are raw storage until constructed by the caller; the pointer is + // published with release so readers that later observe a matching size see it. + T* ensure_bucket_(size_type b) { + T* bucket = dir_[b].load(std::memory_order_relaxed); + if (bucket == nullptr) { + bucket = static_cast(::operator new[](bucket_size_(b) * sizeof(T))); + dir_[b].store(bucket, std::memory_order_release); + if (b + 1 > allocated_buckets_) { + allocated_buckets_ = b + 1; + } + } + return bucket; + } + + // Construct elements [from, to) in place (single-writer). ``fill`` is nullptr for + // default-construction. Allocates buckets as needed. + void construct_range_(size_type from, size_type to, const T* fill) { + for (size_type i = from; i < to; ++i) { + auto [b, off] = locate_(i); + T* bucket = ensure_bucket_(b); + if (fill == nullptr) { + new (&bucket[off]) T(); + } else { + new (&bucket[off]) T(*fill); + } + } + } + + // Destroy elements [from, to) (single-writer). Does not free buckets. + void destroy_range_(size_type from, size_type to) { + for (size_type i = from; i < to; ++i) { + auto [b, off] = locate_(i); + dir_[b].load(std::memory_order_relaxed)[off].~T(); + } + } + + void resize_impl_(size_type n, const T* fill) { + size_type old = size_.load(std::memory_order_relaxed); + if (n == old) { + return; + } + if (n < old) { + // Logical-only shrink (no bucket freeing — use shrink_to for reclamation), + // but still destroy the dropped elements to run their destructors. + size_.store(n, std::memory_order_release); + destroy_range_(n, old); + return; + } + // Grow: construct the new elements, then publish the new size last so a reader + // that observes it sees fully-constructed elements in published buckets. + construct_range_(old, n, fill); + size_.store(n, std::memory_order_release); + } + + // Free every bucket whose entire index range lies at or above ``n`` (single-writer; + // readers drained). A bucket straddling ``n`` keeps its allocation. + void free_buckets_above_(size_type n) { + for (size_type b = allocated_buckets_; b-- > 0;) { + if (bucket_first_index_(b) < n) { + break; // this and all lower buckets contain live (or kept) elements + } + T* bucket = dir_[b].load(std::memory_order_relaxed); + if (bucket != nullptr) { + ::operator delete[](static_cast(bucket)); + dir_[b].store(nullptr, std::memory_order_relaxed); + } + allocated_buckets_ = b; + } + } + + void destroy_all_() { + size_type n = size_.load(std::memory_order_relaxed); + destroy_range_(0, n); + for (size_type b = 0; b < allocated_buckets_; ++b) { + T* bucket = dir_[b].load(std::memory_order_relaxed); + if (bucket != nullptr) { + ::operator delete[](static_cast(bucket)); + dir_[b].store(nullptr, std::memory_order_relaxed); + } + } + size_.store(0, std::memory_order_relaxed); + allocated_buckets_ = 0; + } + + void copy_from_(const SegmentedVector& other) { + size_type n = other.size_.load(std::memory_order_relaxed); + for (size_type i = 0; i < n; ++i) { + auto [b, off] = locate_(i); + T* bucket = ensure_bucket_(b); + new (&bucket[off]) T(other[i]); + } + size_.store(n, std::memory_order_release); + } + + void steal_from_(SegmentedVector& other) noexcept { + for (size_type b = 0; b < kDirBuckets; ++b) { + dir_[b].store( + other.dir_[b].load(std::memory_order_relaxed), std::memory_order_relaxed + ); + other.dir_[b].store(nullptr, std::memory_order_relaxed); + } + size_.store(other.size_.load(std::memory_order_relaxed), std::memory_order_relaxed); + allocated_buckets_ = other.allocated_buckets_; + other.size_.store(0, std::memory_order_relaxed); + other.allocated_buckets_ = 0; + } +}; + +} // namespace svs::lib + +namespace svs { +// Opt SegmentedVector into svs::getindex's optional bounds checking. +template +inline constexpr bool enable_boundschecking> = true; +} // namespace svs diff --git a/include/svs/lib/spinlock.h b/include/svs/lib/spinlock.h index 3f3b85486..b4b644dbe 100644 --- a/include/svs/lib/spinlock.h +++ b/include/svs/lib/spinlock.h @@ -37,6 +37,13 @@ class SpinLock { public: SpinLock() = default; + SpinLock(const SpinLock& /*unused*/) + : value_{false} {} + SpinLock& operator=(const SpinLock& /*unused*/) { return *this; } + SpinLock(SpinLock&& /*unused*/) noexcept + : value_{false} {} + SpinLock& operator=(SpinLock&& /*unused*/) noexcept { return *this; } + /// /// Implement C++ named requirements "Lockable" /// diff --git a/include/svs/quantization/scalar/scalar.h b/include/svs/quantization/scalar/scalar.h index 1ac48ca34..2ff1e88fa 100644 --- a/include/svs/quantization/scalar/scalar.h +++ b/include/svs/quantization/scalar/scalar.h @@ -392,6 +392,7 @@ class SQDataset { , data_{std::move(data)} {} size_t size() const { return data_.size(); } + size_t capacity() const { return data_.capacity(); } size_t dimensions() const { return data_.dimensions(); } size_t element_size() const { return sizeof(element_type) * dimensions(); } diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 8c812d35a..cfaa80d3b 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -79,6 +79,7 @@ set(TEST_SOURCES ${TEST_DIR}/svs/lib/saveload.cpp ${TEST_DIR}/svs/lib/saveload/load.cpp ${TEST_DIR}/svs/lib/scopeguard.cpp + ${TEST_DIR}/svs/lib/segmented_vector.cpp ${TEST_DIR}/svs/lib/spinlock.cpp ${TEST_DIR}/svs/lib/static.cpp ${TEST_DIR}/svs/lib/timing.cpp diff --git a/tests/svs/lib/segmented_vector.cpp b/tests/svs/lib/segmented_vector.cpp new file mode 100644 index 000000000..6ffb1c4d5 --- /dev/null +++ b/tests/svs/lib/segmented_vector.cpp @@ -0,0 +1,237 @@ +/* + * Copyright 2026 Intel Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// header under test +#include "svs/lib/segmented_vector.h" + +// catch2 +#include "catch2/catch_test_macros.hpp" + +// stl +#include +#include +#include + +namespace { + +// Even modest element counts exercise many directory buckets, since bucket k holds +// only 1<; + +} // namespace + +CATCH_TEST_CASE("SegmentedVector basic semantics", "[core][segmented_vector]") { + CATCH_SECTION("Default construction is empty") { + SmallVec v; + CATCH_REQUIRE(v.size() == 0); + } + + CATCH_SECTION("Sized construction and read/write") { + SmallVec v(10); + CATCH_REQUIRE(v.size() == 10); + for (size_t i = 0; i < 10; ++i) { + v[i] = static_cast(i * 2); + } + for (size_t i = 0; i < 10; ++i) { + CATCH_REQUIRE(v[i] == static_cast(i * 2)); + } + } + + CATCH_SECTION("Fill construction") { + SmallVec v(7, 42); + CATCH_REQUIRE(v.size() == 7); + for (size_t i = 0; i < 7; ++i) { + CATCH_REQUIRE(v[i] == 42); + } + } + + CATCH_SECTION("resize grows and preserves existing elements") { + SmallVec v(5); + for (size_t i = 0; i < 5; ++i) { + v[i] = static_cast(i); + } + v.resize(100); + CATCH_REQUIRE(v.size() == 100); + for (size_t i = 0; i < 5; ++i) { + CATCH_REQUIRE(v[i] == static_cast(i)); + } + } + + CATCH_SECTION("resize with fill") { + SmallVec v(3, 1); + v.resize(20, 9); + CATCH_REQUIRE(v.size() == 20); + for (size_t i = 0; i < 3; ++i) { + CATCH_REQUIRE(v[i] == 1); + } + for (size_t i = 3; i < 20; ++i) { + CATCH_REQUIRE(v[i] == 9); + } + } + + CATCH_SECTION("shrink via resize destroys dropped elements; regrow default-constructs" + ) { + SmallVec v(50); + for (size_t i = 0; i < 50; ++i) { + v[i] = static_cast(i); + } + v.resize(10); + CATCH_REQUIRE(v.size() == 10); + for (size_t i = 0; i < 10; ++i) { + CATCH_REQUIRE(v[i] == static_cast(i)); + } + // Regrow: std::vector-like semantics — the dropped range was destroyed on + // shrink, so the regrown tail is default-constructed (0), not the old values. + v.resize(50); + CATCH_REQUIRE(v.size() == 50); + for (size_t i = 0; i < 10; ++i) { + CATCH_REQUIRE(v[i] == static_cast(i)); + } + for (size_t i = 10; i < 50; ++i) { + CATCH_REQUIRE(v[i] == 0); + } + } + + CATCH_SECTION("shrink_to frees segments and lowers size") { + SmallVec v(100); + for (size_t i = 0; i < 100; ++i) { + v[i] = static_cast(i); + } + v.shrink_to(7); + CATCH_REQUIRE(v.size() == 7); + for (size_t i = 0; i < 7; ++i) { + CATCH_REQUIRE(v[i] == static_cast(i)); + } + // Regrow after a real shrink: new elements default-constructed (0). + v.resize(20); + CATCH_REQUIRE(v.size() == 20); + for (size_t i = 0; i < 7; ++i) { + CATCH_REQUIRE(v[i] == static_cast(i)); + } + } +} + +CATCH_TEST_CASE("SegmentedVector copy and move", "[core][segmented_vector]") { + SmallVec v(30); + for (size_t i = 0; i < 30; ++i) { + v[i] = static_cast(i + 100); + } + + CATCH_SECTION("copy construction is a deep copy") { + SmallVec c(v); + CATCH_REQUIRE(c.size() == 30); + for (size_t i = 0; i < 30; ++i) { + CATCH_REQUIRE(c[i] == static_cast(i + 100)); + } + c[0] = -1; + CATCH_REQUIRE(v[0] == 100); // original unaffected + } + + CATCH_SECTION("move construction transfers contents") { + SmallVec m(std::move(v)); + CATCH_REQUIRE(m.size() == 30); + for (size_t i = 0; i < 30; ++i) { + CATCH_REQUIRE(m[i] == static_cast(i + 100)); + } + } +} + +// Address stability: a reference to v[i] obtained before a grow must remain valid +// (point to the same storage) after the grow. This is the core invariant Option C +// relies on. +CATCH_TEST_CASE( + "SegmentedVector grow preserves element addresses", "[core][segmented_vector]" +) { + SmallVec v(8); + for (size_t i = 0; i < 8; ++i) { + v[i] = static_cast(i); + } + std::vector addrs; + for (size_t i = 0; i < 8; ++i) { + addrs.push_back(&v[i]); + } + // Force many segment + directory-bucket allocations. + v.resize(10000); + for (size_t i = 0; i < 8; ++i) { + CATCH_REQUIRE(&v[i] == addrs[i]); // address unchanged + CATCH_REQUIRE(*addrs[i] == static_cast(i)); // value intact + } +} + +// Concurrent grow vs. read: one writer repeatedly grows the vector and fills the new +// elements, publishing a separate `published` counter ONLY after the fill — exactly the +// "publish after construct" contract the dynamic index uses (num_valid_ bumped after a +// slot is fully Valid). Readers read every index < published and assert v[i] == i with +// no tolerance. This both (a) catches use-after-free / torn outer-pointer reads on grow +// and (b) is a hard value invariant. Under TSan it must report no data races. +CATCH_TEST_CASE("SegmentedVector concurrent grow and read", "[core][segmented_vector]") { + using Vec = svs::lib::SegmentedVector; + constexpr size_t kInitial = 100; + constexpr size_t kFinal = 200000; + constexpr size_t kReaders = 8; + + Vec v(kInitial); + for (size_t i = 0; i < kInitial; ++i) { + v[i] = static_cast(i); + } + + std::atomic start{false}; + std::atomic writer_done{false}; + std::atomic published{kInitial}; + std::atomic failure{false}; + + auto reader = [&]() { + while (!start.load(std::memory_order_acquire)) {} + while (!writer_done.load(std::memory_order_acquire)) { + size_t n = published.load(std::memory_order_acquire); + size_t step = n / 256 + 1; + for (size_t i = 0; i < n; i += step) { + if (v[i] != static_cast(i)) { + failure.store(true, std::memory_order_relaxed); + } + } + } + }; + + std::vector readers; + for (size_t r = 0; r < kReaders; ++r) { + readers.emplace_back(reader); + } + + std::thread writer([&]() { + while (!start.load(std::memory_order_acquire)) {} + size_t cur = kInitial; + while (cur < kFinal) { + size_t next = std::min(cur + 137, kFinal); + v.resize(next); // allocate + publish segments (grow) + for (size_t i = cur; i < next; ++i) { // fill new elements + v[i] = static_cast(i); + } + published.store(next, std::memory_order_release); // publish after fill + cur = next; + } + writer_done.store(true, std::memory_order_release); + }); + + start.store(true, std::memory_order_release); + writer.join(); + for (auto& t : readers) { + t.join(); + } + + CATCH_REQUIRE(v.size() == kFinal); + CATCH_REQUIRE_FALSE(failure.load(std::memory_order_relaxed)); +}