Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
555 changes: 555 additions & 0 deletions src/backend/distributed/executor/adaptive_executor.c

Large diffs are not rendered by default.

77 changes: 77 additions & 0 deletions src/backend/distributed/executor/citus_custom_scan.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include "distributed/merge_executor.h"
#include "distributed/merge_planner.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_explain.h"
#include "distributed/multi_router_planner.h"
#include "distributed/multi_server_executor.h"
#include "distributed/shard_utils.h"
Expand All @@ -60,6 +61,7 @@ extern AllowedDistributionColumn AllowedDistributionColumnValue;
/* functions for creating custom scan nodes */
static Node * AdaptiveExecutorCreateScan(CustomScan *scan);
static Node * SortedMergeCreateScan(CustomScan *scan);
static Node * SingleTaskExecutorCreateScan(CustomScan *scan);
static Node * NonPushableInsertSelectCreateScan(CustomScan *scan);
static Node * DelayedErrorCreateScan(CustomScan *scan);
static Node * NonPushableMergeCommandCreateScan(CustomScan *scan);
Expand All @@ -69,6 +71,7 @@ static void CitusBeginScan(CustomScanState *node, EState *estate, int eflags);
static void CitusBeginReadOnlyScan(CustomScanState *node, EState *estate, int eflags);
static void CitusBeginModifyScan(CustomScanState *node, EState *estate, int eflags);
static void CitusPreExecScan(CitusScanState *scanState);
static TupleTableSlot * CitusExecOneTaskScan(CustomScanState *node);
static bool ModifyJobNeedsEvaluation(Job *workerJob);
static void RegenerateTaskForFasthPathQuery(Job *workerJob);
static void RegenerateTaskListForInsert(Job *workerJob);
Expand Down Expand Up @@ -99,6 +102,11 @@ CustomScanMethods SortedMergeCustomScanMethods = {
SortedMergeCreateScan
};

CustomScanMethods SingleTaskExecutorCustomScanMethods = {
"Citus Single Task",
SingleTaskExecutorCreateScan
};

CustomScanMethods NonPushableInsertSelectCustomScanMethods = {
"Citus INSERT ... SELECT",
NonPushableInsertSelectCreateScan
Expand Down Expand Up @@ -127,6 +135,15 @@ static CustomExecMethods AdaptiveExecutorCustomExecMethods = {
.ExplainCustomScan = CitusExplainScan
};

static CustomExecMethods SingleTaskExecutorCustomExecMethods = {
.CustomName = "SingleTaskExecutorScan",
.BeginCustomScan = CitusBeginScan,
.ExecCustomScan = CitusExecOneTaskScan,
.EndCustomScan = CitusEndScan,
.ReScanCustomScan = CitusReScan,
.ExplainCustomScan = CitusExplainScan
};

static CustomExecMethods NonPushableInsertSelectCustomExecMethods = {
.CustomName = "NonPushableInsertSelectScan",
.BeginCustomScan = CitusBeginScan,
Expand Down Expand Up @@ -179,6 +196,7 @@ IsCitusCustomState(PlanState *planState)
CustomScanState *css = castNode(CustomScanState, planState);
if (css->methods == &AdaptiveExecutorCustomExecMethods ||
css->methods == &SortedMergeCustomExecMethods ||
css->methods == &SingleTaskExecutorCustomExecMethods ||
css->methods == &NonPushableInsertSelectCustomExecMethods ||
css->methods == &NonPushableMergeCommandCustomExecMethods)
{
Expand All @@ -197,6 +215,7 @@ RegisterCitusCustomScanMethods(void)
{
RegisterCustomScanMethods(&AdaptiveExecutorCustomScanMethods);
RegisterCustomScanMethods(&SortedMergeCustomScanMethods);
RegisterCustomScanMethods(&SingleTaskExecutorCustomScanMethods);
RegisterCustomScanMethods(&NonPushableInsertSelectCustomScanMethods);
RegisterCustomScanMethods(&DelayedErrorCustomScanMethods);
RegisterCustomScanMethods(&NonPushableMergeCommandCustomScanMethods);
Expand Down Expand Up @@ -335,6 +354,40 @@ CitusExecScan(CustomScanState *node)
}


/*
* CitusExecOneTaskScan is the ExecCustomScan callback for the one-task
* adaptive executor. On the first call it executes the single-shard
* fast-path query via the streamlined SingleTaskExecutor, falling
* back to the full AdaptiveExecutor for EXPLAIN ANALYZE.
*/
static TupleTableSlot *
CitusExecOneTaskScan(CustomScanState *node)
{
CitusScanState *scanState = (CitusScanState *) node;

if (!scanState->finishedRemoteScan)
{
if (RequestedForExplainAnalyze(scanState))
{
EagerAdaptiveExecutor(scanState);
}
else
{
SingleTaskExecutor(scanState);
}

if (!scanState->distributedPlan->disableTrackingQueryCounters)
{
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
}

scanState->finishedRemoteScan = true;
}

return ReturnTupleFromTuplestore(scanState);
}


/*
* CitusBeginReadOnlyScan handles deferred pruning and plan caching for SELECTs.
*/
Expand Down Expand Up @@ -805,6 +858,30 @@ SortedMergeCreateScan(CustomScan *scan)
}


/*
* SingleTaskExecutorCreateScan creates the scan state for the
* single-task executor, used for single-shard fast-path queries.
*/
static Node *
SingleTaskExecutorCreateScan(CustomScan *scan)
{
CitusScanState *scanState = palloc0(sizeof(CitusScanState));

/* reuse MULTI_EXECUTOR_ADAPTIVE for stats bucketing (see Research §6.1) */
scanState->executorType = MULTI_EXECUTOR_ADAPTIVE;
scanState->customScanState.ss.ps.type = T_CustomScanState;
scanState->distributedPlan = GetDistributedPlan(scan);

scanState->customScanState.methods = &SingleTaskExecutorCustomExecMethods;
scanState->PreExecScan = &CitusPreExecScan;

scanState->finishedPreScan = false;
scanState->finishedRemoteScan = false;

return (Node *) scanState;
}


/*
* NonPushableInsertSelectCrateScan creates the scan state for executing
* INSERT..SELECT into a distributed table via the coordinator.
Expand Down
43 changes: 41 additions & 2 deletions src/backend/distributed/executor/multi_server_executor.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,20 @@
#include "distributed/coordinator_protocol.h"
#include "distributed/listutils.h"
#include "distributed/log_utils.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_router_planner.h"
#include "distributed/multi_server_executor.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/subplan_execution.h"
#include "distributed/tuple_destination.h"
#include "distributed/worker_protocol.h"

int RemoteTaskCheckInterval = 10; /* per cycle sleep interval in millisecs */
int TaskExecutorType = MULTI_EXECUTOR_ADAPTIVE; /* distributed executor type */
bool EnableRepartitionJoins = false;
bool EnableSingleTaskExecution = true;


/*
Expand Down Expand Up @@ -99,6 +102,42 @@ JobExecutorType(DistributedPlan *distributedPlan)
}
}

return distributedPlan->useSortedMerge ? MULTI_EXECUTOR_SORTED_MERGE :
MULTI_EXECUTOR_ADAPTIVE;
if (EnableSingleTaskExecution &&
distributedPlan->fastPathRouterPlan &&
list_length(job->dependentJobList) == 0 &&
!IsMultiRowInsert(job->jobQuery) &&
!(distributedPlan->modLevel > ROW_MODIFY_READONLY &&
IsCitusTableType(distributedPlan->targetRelationId, REFERENCE_TABLE)))
{
/*
* We get the relation OID from the task's relationShardList rather
* than distributedPlan->relationIdList because the latter may contain
* shard OIDs after CheckAndBuildDelayedFastPathPlan() replaces the
* shell table OID with the shard OID for local execution plans.
*/
Oid relationId = distributedPlan->targetRelationId;
if (!OidIsValid(relationId) && list_length(job->taskList) == 1)
{
Task *task = (Task *) linitial(job->taskList);
if (list_length(task->relationShardList) > 0)
Comment on lines +118 to +122

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The !OidIsValid(relationId) check covers SELECT queries, which always have targetRelationId = InvalidOid. Reading from a single placement is safe regardless of replication factor. For DML, targetRelationId is always set, so the SingleReplicatedTable() check correctly routes multi-replicated tables to the adaptive executor for write replication.

{
RelationShard *rs = (RelationShard *) linitial(
task->relationShardList);
relationId = rs->relationId;
}
}

if (!OidIsValid(relationId) ||
SingleReplicatedTable(relationId))
{
return MULTI_EXECUTOR_SINGLE_TASK;
}
}

if (distributedPlan->useSortedMerge)
{
return MULTI_EXECUTOR_SORTED_MERGE;
}

return MULTI_EXECUTOR_ADAPTIVE;
}
6 changes: 6 additions & 0 deletions src/backend/distributed/planner/distributed_planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -1485,6 +1485,12 @@ FinalizePlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan,
break;
}

case MULTI_EXECUTOR_SINGLE_TASK:
{
customScan->methods = &SingleTaskExecutorCustomScanMethods;
break;
}

case MULTI_EXECUTOR_NON_PUSHABLE_INSERT_SELECT:
{
customScan->methods = &NonPushableInsertSelectCustomScanMethods;
Expand Down
13 changes: 13 additions & 0 deletions src/backend/distributed/shared_library_init.c
Original file line number Diff line number Diff line change
Expand Up @@ -1607,6 +1607,19 @@ RegisterCitusConfigVariables(void)
GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE,
NULL, NULL, NULL);

DefineCustomBoolVariable(
"citus.enable_single_task_execution",
gettext_noop("Enables the optimized single-task executor for "
"fast-path router queries."),
gettext_noop("When enabled, single-shard fast-path queries use a "
"streamlined executor that bypasses connection pool "
"management and wait event set overhead."),
&EnableSingleTaskExecution,
true,
PGC_USERSET,
GUC_STANDARD,
NULL, NULL, NULL);

DefineCustomBoolVariable(
"citus.enable_sorted_merge",
gettext_noop("Enables sorted merge of worker results for ORDER BY queries."),
Expand Down
2 changes: 1 addition & 1 deletion src/include/distributed/adaptive_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ extern uint64 ExecuteUtilityTaskListExtended(List *utilityTaskList, int poolSize
bool localExecutionSupported);
extern uint64 ExecuteTaskListOutsideTransaction(RowModifyLevel modLevel, List *taskList,
int targetPoolSize, List *jobIdList);

extern void EagerAdaptiveExecutor(CitusScanState *scanState);
extern void SingleTaskExecutor(CitusScanState *scanState);

#endif /* ADAPTIVE_EXECUTOR_H */
1 change: 1 addition & 0 deletions src/include/distributed/citus_custom_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ extern CustomScanMethods SortedMergeCustomScanMethods;
extern CustomScanMethods NonPushableInsertSelectCustomScanMethods;
extern CustomScanMethods DelayedErrorCustomScanMethods;
extern CustomScanMethods NonPushableMergeCommandCustomScanMethods;
extern CustomScanMethods SingleTaskExecutorCustomScanMethods;


extern void RegisterCitusCustomScanMethods(void);
Expand Down
4 changes: 3 additions & 1 deletion src/include/distributed/multi_server_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,16 @@ typedef enum
MULTI_EXECUTOR_ADAPTIVE = 1,
MULTI_EXECUTOR_NON_PUSHABLE_INSERT_SELECT = 2,
MULTI_EXECUTOR_NON_PUSHABLE_MERGE_QUERY = 3,
MULTI_EXECUTOR_SORTED_MERGE = 4
MULTI_EXECUTOR_SORTED_MERGE = 4,
MULTI_EXECUTOR_SINGLE_TASK = 5
} MultiExecutorType;


/* Config variable managed via guc.c */
extern int RemoteTaskCheckInterval;
extern int TaskExecutorType;
extern bool EnableRepartitionJoins;
extern bool EnableSingleTaskExecution;
extern int MultiTaskQueryLogLevel;


Expand Down
11 changes: 11 additions & 0 deletions src/test/regress/bin/normalize.sed
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,17 @@ s/improvement of 0.1[0-9]* is lower/improvement of 0.1xxxxx is lower/g
# normalize tenants statistics annotations
s/\/\*\{"cId":.*\*\///g

# normalize repartition hash boundaries in worker_partition_query_result calls
# (partition count varies by number of readable nodes in the cluster)
/worker_partition_query_result/s/'\{[0-9,-]+\}'::text\[\]/'{...}'::text[]/g

# normalize repartition partition index suffix (varies by node count)
s/repartition_([0-9]+_[0-9]+)_[0-9]+/repartition_\1_N/g

# normalize shard IDs in the 1470xxx range (used by local_shard_execution tests)
# shard-to-worker placement varies depending on which schedule runs before the test
s/_147[0-9]{4}/_xxxxxxx/g

# Notice message that contains current columnar version that makes it harder to bump versions
s/(NOTICE: issuing CREATE EXTENSION IF NOT EXISTS citus_columnar WITH SCHEMA pg_catalog VERSION )"[0-9]+\.[0-9]+-[0-9]+"/\1 "x.y-z"/

Expand Down
1 change: 1 addition & 0 deletions src/test/regress/citus_tests/run_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ def extra_tests(self):
"multi_test_catalog_views": TestDeps(None),
"multi_insert_select": TestDeps("base_schedule"),
"multi_partitioning": TestDeps("base_schedule"),
"multi_transaction_recovery": TestDeps("minimal_schedule"),
"multi_mx_create_table": TestDeps(
None,
[
Expand Down
Loading
Loading