Add support for async workflow activities#1053
Conversation
| f"Activity '{req.name}#{req.taskId}' result is too large to deliver " | ||
| f'(RESOURCE_EXHAUSTED). Failing the activity task: {rpc_error.details()}' | ||
| ) | ||
| failure_res = pb.ActivityResponse( |
There was a problem hiding this comment.
This nesting got hard to follow, I needed to refactor this file to understand the logic better.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #1053 +/- ##
==========================================
- Coverage 86.63% 83.18% -3.45%
==========================================
Files 84 152 +68
Lines 4473 15536 +11063
==========================================
+ Hits 3875 12924 +9049
- Misses 598 2612 +2014 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
Signed-off-by: Sergio Herrera <627709+seherv@users.noreply.github.com>
Signed-off-by: Sergio Herrera <627709+seherv@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 10 out of 11 changed files in this pull request and generated 4 comments.
Comments suppressed due to low confidence (1)
ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/worker.py:678
- This adds an extra registry lookup (
get_activity) andinspect.iscoroutinefunctioncheck on the hot dispatch path, but_ActivityExecutor._resolve()will look up the activity again during execution. Consider plumbingactivity_fn(and/or anis_asyncflag decided at registration time) through to the executor so each activity work item only does one lookup in total.
activity_handler,
work_item.activityRequest,
stub,
work_item.completionToken,
)
Signed-off-by: Sergio Herrera <627709+seherv@users.noreply.github.com>
Signed-off-by: Sergio Herrera <627709+seherv@users.noreply.github.com>
Signed-off-by: Sergio Herrera <627709+seherv@users.noreply.github.com>
Signed-off-by: Sergio Herrera <627709+seherv@users.noreply.github.com>
Signed-off-by: Sergio Herrera <627709+seherv@users.noreply.github.com>
Signed-off-by: Sergio Herrera <627709+seherv@users.noreply.github.com>
Signed-off-by: Sergio Herrera <627709+seherv@users.noreply.github.com>
Signed-off-by: Sergio Herrera <627709+seherv@users.noreply.github.com>
|
Improving benchmarks and regression tests before submitting for review again |
Signed-off-by: Sergio Herrera <627709+seherv@users.noreply.github.com>
Signed-off-by: Sergio Herrera <627709+seherv@users.noreply.github.com>
Signed-off-by: Sergio Herrera <627709+seherv@users.noreply.github.com>
|
Thanks for working on this! I agree with the general direction, async activity support is something we've wanted for a while (#834, #897, #975), and keeping the user-facing API unchanged is the right approach. I'd like to share my opinion on the benchmarking side, though. I think the sync-vs-async comparison is valuable in the context of this PR, to justify the why behind the change, but not as something we keep and maintain in the codebase. Concretely:
My suggestion: run the benchmarks locally and paste the results + methodology into the PR description. Then the harness, the That said, I do think we should keep one simple throughput regression test so the async path can't silently regress, something like running 1000 activities that each take 1 second and asserting the batch completes in well under 10 seconds (ideally it's ~1s, so a 10x bound leaves plenty of headroom for CI noise). That single assertion catches the failure mode we actually care about, async activities accidentally getting serialized, without needing the full harness, and it can be written directly against the runtime in a few dozen lines. Thoughts? |
Signed-off-by: Sergio Herrera <627709+seherv@users.noreply.github.com>
Signed-off-by: Sergio Herrera <627709+seherv@users.noreply.github.com>
sicoyle
left a comment
There was a problem hiding this comment.
few comments so far. I'm almost half way through the files/changes :)
| try: | ||
| return Path(path).read_text(encoding='utf-8', errors='ignore') | ||
| except OSError: | ||
| return '' |
There was a problem hiding this comment.
could you maybe add a warn log here
| """Sync-vs-async activity benchmarks for ``dapr-ext-workflow``. | ||
|
|
||
| Runs the same I/O-bound activity workload as ``def`` and ``async def`` through the | ||
| production dispatch path against a mock sidecar stub. Scenarios: a fan-out burst, a | ||
| fan-out shaped as many small workflows, and a sustained open-loop run. | ||
|
|
||
| Run: | ||
|
|
||
| uv run python ext/dapr-ext-workflow/benchmarks/bench_async_activities.py | ||
|
|
||
| ``DAPR_BENCH_ACTIVITY_MS`` overrides the activity duration, ``DAPR_BENCH_SUSTAINED_SECONDS`` | ||
| the sustained run. Writes ``benchmarks/RESULTS.md`` and asserts pass-criteria budgets. | ||
| """ |
There was a problem hiding this comment.
ideally this is not needed and is included in a readme pls
| return rows | ||
|
|
||
|
|
||
| async def run_sustained() -> tuple[SustainedMetrics, SustainedMetrics]: |
There was a problem hiding this comment.
what does this mean sustained? like constant throughput over the same duration?
| # Sync vs async activity benchmark | ||
|
|
||
| Generated by `bench_async_activities.py`. Re-run with: | ||
|
|
||
| ```bash | ||
| uv run python ext/dapr-ext-workflow/benchmarks/bench_async_activities.py |
There was a problem hiding this comment.
pls don't include how to run here bc this can easily become stale if we rename the file or move things.
|
|
||
| {_format_sustained_table(sustained_sync, sustained_async)} | ||
|
|
||
| See `ext/dapr-ext-workflow/docs/concurrency.md` for sizing guidance. |
There was a problem hiding this comment.
is this a doc you plan to share? or can you rm this ref pls?
| end_ts: dict[int, float], | ||
| ) -> tuple[Callable[..., object], Callable[..., object]]: | ||
| """Build the activity callable and pick the matching dispatch handler for ``kind``.""" | ||
| if kind == 'async': |
| else _async_sleep_factory(latency_s, start_ts, end_ts) | ||
| ) | ||
| return fn, worker._execute_activity_async | ||
| if kind == 'sync': |
| ) | ||
|
|
||
|
|
||
| ActivityFactory = Callable[[dict[int, float], dict[int, float]], Callable[..., object]] |
There was a problem hiding this comment.
in general can you also split this file up with smaller appropriately named files pls?
| def _metrics( | ||
| *, | ||
| name: str, | ||
| n_items: int, | ||
| semaphore_cap: int, | ||
| thread_pool_workers: int, | ||
| server_latency_s: float, | ||
| wallclock_s: float, | ||
| e2e_samples: list[float], | ||
| sampler: _Sampler, | ||
| baseline_rss_kb: int, | ||
| ) -> ScenarioMetrics: | ||
| completed = len(e2e_samples) if e2e_samples else n_items | ||
| return ScenarioMetrics( | ||
| name=name, | ||
| n_items=n_items, | ||
| semaphore_cap=semaphore_cap, | ||
| thread_pool_workers=thread_pool_workers, | ||
| server_latency_s=server_latency_s, | ||
| wallclock_s=wallclock_s, | ||
| throughput_per_s=completed / wallclock_s if wallclock_s > 0 else 0.0, | ||
| latency=LatencyStats.from_samples(e2e_samples), | ||
| peak_tasks=sampler.peak_tasks, | ||
| peak_queue_depth=sampler.peak_queue_depth, | ||
| peak_rss_delta_mb=max(0.0, (sampler.peak_rss_kb - baseline_rss_kb) / 1024.0), | ||
| ) | ||
|
|
||
|
|
||
| def _make_activity_context(orchestration_id: str, task_id: int) -> task.ActivityContext: | ||
| return task.ActivityContext(orchestration_id, task_id, '', propagated_history=None) |
There was a problem hiding this comment.
is it really worth creating helpers that are one liners?
| ) | ||
|
|
||
|
|
||
| async def _run_lite( |
There was a problem hiding this comment.
maybe docstrings would help on some of these funcs pls?
Description
Workflow activities can now be async, and the runtime will automatically dispatch them to the event loop. Sync activities are still dispatched to the thread pool. The user-facing API remains exactly the same.
Also added a benchmark suite to verify performance locally.
Issue reference
We strive to have all PR being opened based on an issue, where the problem or feature have been discussed prior to implementation.
Please reference the issue this PR will close: #834 #897 #975
Checklist
Please make sure you've completed the relevant tasks for this PR, out of the following list: