Streamline adaptive executor for single task queries#8607
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #8607 +/- ##
==========================================
- Coverage 88.73% 88.72% -0.01%
==========================================
Files 288 288
Lines 64384 64635 +251
Branches 8108 8161 +53
==========================================
+ Hits 57133 57350 +217
- Misses 4909 4928 +19
- Partials 2342 2357 +15 🚀 New features to boost your workflow:
|
7baf998 to
295095c
Compare
…path queries Introduces SingleTaskAdaptiveExecutor, a streamlined execution path for single-shard fast-path router queries that bypasses the full adaptive executor's WaitEventSet machinery, pool management, and slow-start logic. Eligibility: fastPathRouterPlan && citus.enable_single_task_execution GUC (default on) && not multi-row INSERT && no dependent jobs && single-replicated table. EXPLAIN ANALYZE falls back to the regular AdaptiveExecutor at execution time. Uses simple WaitLatchOrSocket polling on a single connection instead of the multi-connection WaitEventSet infrastructure. Multi-replicated tables (shard_replication_factor > 1) are excluded because SingleTaskAdaptiveExecutor only uses the first placement, which would silently skip replica writes. This covers modifications to reference tables. New GUC: citus.enable_single_task_execution (PGC_USERSET, default true) New executor type: MULTI_EXECUTOR_ONE_TASK_ADAPTIVE New scan name: "Citus Single-Task Adaptive" Self Review fixes: guard stat counter, document SortTupleStore omission and fix connection error text - use libpq error messages everywhere. Also fix a parameter handling bug; `MarkUnreferencedExternParams()` modified the params list which was then passed to local execution, which expects unmodified params. Fixed by using unmodified params for local execution.
295095c to
dfdc70b
Compare
ce9ced7 to
894f941
Compare
Fix local_shard_execution and multi_transaction_recovery to pass reliably under both standalone (flaky) and full-schedule CI runs. local_shard_execution: Shard-to-worker placement varies depending on which cluster setup schedule runs before the test (minimal_cluster_management vs multi_cluster_management assign different group IDs, which changes round-robin shard placement). Add a normalize.sed rule to mask shard IDs in the 1470xxx range so the comparison succeeds regardless of placement order. Also use colocate_with => 'none' to prevent colocation group inheritance from prior tests. multi_transaction_recovery: The test inserted fake pg_dist_transaction records using hardcoded group IDs (0 and 1), which only matched the multi_cluster_management setup. When the flaky runner uses minimal_cluster_management (group IDs 14 and 16), recover_prepared_transactions() could not find the matching nodes and left stale records. Fix by looking up the actual worker_1 group ID dynamically. Also DELETE FROM pg_dist_transaction at the start to clear residual records from prior schedule tests, and add a TestDeps entry so run_test.py uses minimal_schedule.
894f941 to
db3dd8a
Compare
| if (PQisBusy(connection->pgConn)) | ||
| { | ||
| int sock = PQsocket(connection->pgConn); | ||
| int rc = WaitLatchOrSocket(MyLatch, |
There was a problem hiding this comment.
Doesn't this create a wait event set in every call? If so, what's the gain here? Seems like a lot of code to maintain.
There was a problem hiding this comment.
Yes, it creates a WaitEventSet per call. This path doesn't fire when results are buffered on entering the loop. But for large result sets it could accumulate overhead so will likely change to create lazily, when the socket is not immediately readable.
what's the gain here?
The gain comes from bypassing the pool/session/connection-state-machine infrastructure, not the wait mechanics. Running pgbench on CPU-bound workload shows TPS gain of 15% (SELECT), 12% (INSERT) and 12% (UPDATE) so for an in-memory OLTP workload there's a substantial gain.
bbe402e to
febc9ac
Compare
febc9ac to
e1be8b7
Compare
There was a problem hiding this comment.
Pull request overview
This PR introduces a new streamlined execution path (“SingleTaskAdaptiveExecutor”) for eligible single-shard fast-path router queries, aiming to reduce overhead by bypassing the full adaptive executor machinery while preserving EXPLAIN visibility and maintaining correctness via eligibility checks and fallbacks.
Changes:
- Add a new executor type (
MULTI_EXECUTOR_SINGLE_TASK) plus custom scan plumbing to run a dedicated single-task execution path for eligible fast-path router queries. - Introduce a new GUC (
citus.enable_single_task_execution, default on) and update plan selection logic to choose the single-task executor when safe. - Expand regression coverage with a new
single_task_executiontest, update multiple expected files for the new scan name, and add normalization to reduce output flakiness.
Reviewed changes
Copilot reviewed 43 out of 45 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| src/backend/distributed/executor/adaptive_executor.c | Implements SingleTaskExecutor() and supporting logic for single-connection polling and result ingestion. |
| src/backend/distributed/executor/citus_custom_scan.c | Registers the new custom scan methods and routes execution to SingleTaskExecutor (or falls back for EXPLAIN ANALYZE). |
| src/backend/distributed/executor/multi_server_executor.c | Chooses MULTI_EXECUTOR_SINGLE_TASK for eligible fast-path router plans (with replication/eligibility checks). |
| src/backend/distributed/planner/distributed_planner.c | Wires MULTI_EXECUTOR_SINGLE_TASK to the new custom scan methods during plan finalization. |
| src/backend/distributed/shared_library_init.c | Adds the citus.enable_single_task_execution GUC definition. |
| src/include/distributed/adaptive_executor.h | Exposes the SingleTaskExecutor(CitusScanState *) entry point. |
| src/include/distributed/citus_custom_scan.h | Declares SingleTaskExecutorCustomScanMethods. |
| src/include/distributed/multi_server_executor.h | Adds MULTI_EXECUTOR_SINGLE_TASK and the EnableSingleTaskExecution GUC variable. |
| src/test/regress/bin/normalize.sed | Normalizes repartition-related NOTICE output (hash boundaries, suffixes, shard id ranges) to reduce flakiness. |
| src/test/regress/citus_tests/run_test.py | Adjusts test deps (adds multi_transaction_recovery to minimal_schedule). |
| src/test/regress/multi_1_schedule | Schedules the new single_task_execution test in the non-parallelized block with other router prepared-statement tests. |
| src/test/regress/sql/single_task_execution.sql | New regression test validating explain output and result equivalence with the GUC toggled. |
| src/test/regress/sql/multi_transaction_recovery.sql | Makes the test more robust (cleanup + dynamic group IDs) and disables single-task exec for predictable 2PC behavior. |
| src/test/regress/sql/multi_router_planner_fast_path.sql | Forces custom plans for prepared statements to avoid flaky debug output; resets afterward. |
| src/test/regress/sql/local_shard_execution.sql | Makes colocations explicit; disables single-task for predictable error messages; wraps flaky error cases in DO/EXCEPTION checks. |
| src/test/regress/sql/failure_single_select.sql | Disables single-task exec for predictable error behavior in failure tests. |
| src/test/regress/sql/failure_savepoints.sql | Disables single-task exec for predictable error behavior in failure tests. |
| src/test/regress/sql/failure_multi_dml.sql | Disables single-task exec for predictable error behavior; ensures worker-side script also disables it. |
| src/test/regress/sql/failure_connection_establishment.sql | Disables single-task exec for predictable error behavior in connection-establishment tests. |
| src/test/regress/expected/single_task_execution.out | New expected output file for the new regression test. |
| src/test/regress/expected/upgrade_basic_after.out | Updates expected EXPLAIN scan name for single-task-eligible plans. |
| src/test/regress/expected/upgrade_basic_after_0.out | Adds an additional expected output variant for upgrade coverage. |
| src/test/regress/expected/sqlancer_failures.out | Updates expected EXPLAIN scan name to “Citus Single Task” where applicable. |
| src/test/regress/expected/multi_router_planner_fast_path.out | Updates expected debug output + plan_cache_mode SET/RESET lines. |
| src/test/regress/expected/multi_transaction_recovery.out | Updates expected output for cleanup, dynamic group IDs, and the single-task disable. |
| src/test/regress/expected/local_shard_execution_replicated.out | Updates expected NOTICE output after normalization changes. |
| src/test/regress/expected/failure_single_select.out | Mirrors the SQL change disabling single-task exec. |
| src/test/regress/expected/failure_savepoints.out | Mirrors the SQL change disabling single-task exec. |
| src/test/regress/expected/failure_multi_dml.out | Mirrors the SQL change disabling single-task exec; expected worker script output changes. |
| src/test/regress/expected/failure_connection_establishment.out | Mirrors the SQL change disabling single-task exec. |
| src/test/regress/expected/multi_mx_insert_select_repartition.out | Updates expected NOTICE output after normalization changes. |
| src/test/regress/expected/multi_mx_explain.out | Updates expected EXPLAIN scan name to “Citus Single Task” where applicable. |
| src/test/regress/expected/multi_mx_explain_0.out | Updates expected EXPLAIN scan name to “Citus Single Task” where applicable. |
| src/test/regress/expected/multi_explain.out | Updates expected EXPLAIN scan name to “Citus Single Task” where applicable. |
| src/test/regress/expected/multi_explain_0.out | Updates expected EXPLAIN scan name to “Citus Single Task” where applicable. |
| src/test/regress/expected/multi_tenant_isolation.out | Updates expected EXPLAIN scan name to “Citus Single Task” where applicable. |
| src/test/regress/expected/multi_tenant_isolation_nonblocking.out | Updates expected EXPLAIN scan name to “Citus Single Task” where applicable. |
| src/test/regress/expected/pg13.out | Updates expected EXPLAIN/ANALYZE output formatting and scan name for single-task-eligible plan(s). |
| src/test/regress/expected/pg15.out | Updates expected EXPLAIN scan name to “Citus Single Task” where applicable. |
| src/test/regress/expected/pg18.out | Updates expected EXPLAIN scan name to “Citus Single Task” where applicable; aligns formatting for pg_get_loaded_modules output. |
| src/test/regress/expected/multi_data_types.out | Updates expected EXPLAIN/ANALYZE scan name + spacing for single-task-eligible DML. |
| src/test/regress/expected/drop_column_partitioned_table.out | Updates expected EXPLAIN scan name to “Citus Single Task” where applicable. |
| src/test/regress/expected/coordinator_shouldhaveshards.out | Updates expected NOTICE output after normalization changes. |
| src/test/regress/expected/single_node.out | Updates expected NOTICE output after normalization changes. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| Oid relationId = distributedPlan->targetRelationId; | ||
| if (!OidIsValid(relationId) && list_length(job->taskList) == 1) | ||
| { | ||
| Task *task = (Task *) linitial(job->taskList); | ||
| if (list_length(task->relationShardList) > 0) |
There was a problem hiding this comment.
The !OidIsValid(relationId) check covers SELECT queries, which always have targetRelationId = InvalidOid. Reading from a single placement is safe regardless of replication factor. For DML, targetRelationId is always set, so the SingleReplicatedTable() check correctly routes multi-replicated tables to the adaptive executor for write replication.
DESCRIPTION: New scan path for fast path queries that is a streamlined version of adaptive executor specialized for one task.
Introduces SingleTaskAdaptiveExecutor, a streamlined execution path for single-shard fast-path router queries that bypasses the full adaptive executor's WaitEventSet machinery, pool management, and slow-start logic.
Eligibility: fastPathRouterPlan && citus.enable_single_task_execution GUC (default on) && not multi-row INSERT && no dependent jobs && single-replicated table. EXPLAIN ANALYZE falls back to the regular AdaptiveExecutor at execution time.
Uses simple WaitLatchOrSocket polling on a single connection instead of the multi-connection WaitEventSet infrastructure.
Multi-replicated tables (shard_replication_factor > 1) are excluded because SingleTaskAdaptiveExecutor only uses the first placement, which would silently skip replica writes. This covers modifications to reference tables.
New GUC: citus.enable_single_task_execution (PGC_USERSET, default true)
New executor type: MULTI_EXECUTOR_SINGLE_TASK
New scan name: "Citus Single Task"