diff --git a/LAST_VERSION b/LAST_VERSION index ee082731..6c2d9649 100644 --- a/LAST_VERSION +++ b/LAST_VERSION @@ -1 +1 @@ -1.0.0.dev5 +1.0.0.dev6 diff --git a/docs_src/checkpoint/checkpoint_input_change.py b/docs_src/checkpoint/checkpoint_input_change.py new file mode 100644 index 00000000..f859eec5 --- /dev/null +++ b/docs_src/checkpoint/checkpoint_input_change.py @@ -0,0 +1,38 @@ +from hashlib import sha256 +from uuid import UUID + +from dotflow import Config, DotFlow, action +from dotflow.providers import StorageFile + + +@action +def step_one(initial_context): + return {"loaded": initial_context.storage} + + +@action +def step_two(previous_context): + return {"transformed": previous_context.storage} + + +config = Config(storage=StorageFile()) + + +def main(): + payload = {"s3_key": "uploads/data.zip", "v": 1} + + workflow = DotFlow( + config=config, + workflow_id=UUID("12345678-1234-5678-1234-567812345678"), + ) + + workflow.task.add(step=[step_one, step_two], initial_context=payload) + + fp = sha256(payload["s3_key"].encode()).hexdigest() + workflow.start(resume=True, on_input_change="reset", fingerprint=fp) + + return workflow + + +if __name__ == "__main__": + main() diff --git a/dotflow/__init__.py b/dotflow/__init__.py index ef7c18c9..94985612 100644 --- a/dotflow/__init__.py +++ b/dotflow/__init__.py @@ -1,6 +1,6 @@ """Dotflow __init__ module.""" -__version__ = "1.0.0.dev5" +__version__ = "1.0.0.dev6" __description__ = "🎲 Dotflow turns an idea into flow!" from .core.action import Action as action diff --git a/dotflow/abc/flow.py b/dotflow/abc/flow.py index 2ce89ece..7efbe2ca 100644 --- a/dotflow/abc/flow.py +++ b/dotflow/abc/flow.py @@ -1,5 +1,7 @@ """Flow ABC""" +from __future__ import annotations + from abc import ABC, abstractmethod from uuid import UUID diff --git a/dotflow/abc/storage.py b/dotflow/abc/storage.py index 6264ea27..51ac1f3e 100644 --- a/dotflow/abc/storage.py +++ b/dotflow/abc/storage.py @@ -23,3 +23,11 @@ def get(self, key: str) -> Context: @abstractmethod def key(self, task: Callable): """Function that returns a key to get and post storage""" + + @abstractmethod + def clear(self, workflow_id: str) -> None: + """Remove every persisted entry under ``workflow_id``. + + Used by the input-fingerprint reset path when + ``on_input_change='reset'``. + """ diff --git a/dotflow/cloud/aws/services/s3.py b/dotflow/cloud/aws/services/s3.py index afe6540f..0b412415 100644 --- a/dotflow/cloud/aws/services/s3.py +++ b/dotflow/cloud/aws/services/s3.py @@ -45,3 +45,26 @@ def write(self, key: str, data: list) -> None: Body=dumps(data), ContentType="application/json", ) + + def delete_prefix(self, sub_prefix: str) -> None: + """Delete every object whose key starts with prefix + sub_prefix. + + Empty ``sub_prefix`` is rejected to avoid accidentally wiping + the entire bucket prefix. + """ + if not sub_prefix: + raise ValueError("delete_prefix requires a non-empty sub_prefix") + + full_prefix = f"{self.prefix}{sub_prefix}" + paginator = self._s3.get_paginator("list_objects_v2") + + for page in paginator.paginate(Bucket=self.bucket, Prefix=full_prefix): + objects = [ + {"Key": item["Key"]} for item in page.get("Contents", []) + ] + if not objects: + continue + self._s3.delete_objects( + Bucket=self.bucket, + Delete={"Objects": objects}, + ) diff --git a/dotflow/cloud/gcp/services/gcs.py b/dotflow/cloud/gcp/services/gcs.py index 855449a1..36c40f18 100644 --- a/dotflow/cloud/gcp/services/gcs.py +++ b/dotflow/cloud/gcp/services/gcs.py @@ -43,3 +43,17 @@ def write(self, key: str, data: list) -> None: dumps(data), content_type="application/json", ) + + def delete_prefix(self, sub_prefix: str) -> None: + """Delete every blob whose name starts with prefix + sub_prefix. + + Empty ``sub_prefix`` is rejected to avoid accidentally wiping + the entire bucket prefix. + """ + if not sub_prefix: + raise ValueError("delete_prefix requires a non-empty sub_prefix") + + full_prefix = f"{self.prefix}{sub_prefix}" + + for blob in self._client.list_blobs(self._bucket, prefix=full_prefix): + blob.delete() diff --git a/dotflow/core/engine.py b/dotflow/core/engine.py index 09ce4513..c3fa55a7 100644 --- a/dotflow/core/engine.py +++ b/dotflow/core/engine.py @@ -1,5 +1,7 @@ """TaskEngine module""" +from __future__ import annotations + import re from collections.abc import Callable from concurrent.futures import ( diff --git a/dotflow/core/exception.py b/dotflow/core/exception.py index 6e68f56c..b1fd2a89 100644 --- a/dotflow/core/exception.py +++ b/dotflow/core/exception.py @@ -21,6 +21,14 @@ MESSAGE_WORKFLOW_FLAG_CONFLICT = ( "{flag} is only valid with --step and cannot be used with --workflow." ) +MESSAGE_INPUT_CHANGED = ( + "Workflow '{workflow_id}' was previously executed with a different " + "initial_context. Pass on_input_change='reset' to discard prior " + "checkpoints, or 'reuse' to ignore the new input." +) +MESSAGE_INVALID_ON_INPUT_CHANGE = ( + "on_input_change must be one of 'reuse', 'reset', 'raise'; got '{value}'." +) class MissingActionDecorator(Exception): @@ -72,6 +80,16 @@ def __init__(self): super().__init__("Unknown") +class InputChangedError(Exception): + def __init__(self, workflow_id: str): + super().__init__(MESSAGE_INPUT_CHANGED.format(workflow_id=workflow_id)) + + +class InvalidOnInputChange(Exception): + def __init__(self, value: str): + super().__init__(MESSAGE_INVALID_ON_INPUT_CHANGE.format(value=value)) + + class TaskError: def __init__(self, error: Exception = None, attempt: int = None) -> None: self.attempt = attempt diff --git a/dotflow/core/fingerprint.py b/dotflow/core/fingerprint.py new file mode 100644 index 00000000..51445323 --- /dev/null +++ b/dotflow/core/fingerprint.py @@ -0,0 +1,45 @@ +"""Input fingerprint helpers for resume + on_input_change policy.""" + +from __future__ import annotations + +from hashlib import sha256 +from json import dumps +from typing import Any + +from dotflow.core.context import Context + +FP_KEY_SUFFIX = "-_input_fp" + + +def fingerprint_of(values: list[Any]) -> str: + """SHA256 of JSON-serialized list of initial_context payloads. + + Inputs must be JSON-serializable (primitives, lists, dicts). + Non-serializable objects fall back to ``repr``, which is **not** + guaranteed to be stable across processes — objects without a custom + ``__repr__`` will produce different fingerprints on every run because + the default repr embeds the memory address. + """ + try: + encoded = dumps(values, sort_keys=True) + except (TypeError, ValueError): + encoded = repr(values) + + return sha256(encoded.encode("utf-8")).hexdigest() + + +def fp_key(workflow_id: str) -> str: + return f"{workflow_id}{FP_KEY_SUFFIX}" + + +def read_fingerprint(storage, workflow_id: str) -> str | None: + context = storage.get(key=fp_key(workflow_id)) + + if isinstance(context, Context) and isinstance(context.storage, str): + return context.storage + + return None + + +def write_fingerprint(storage, workflow_id: str, value: str) -> None: + storage.post(key=fp_key(workflow_id), context=Context(storage=value)) diff --git a/dotflow/core/serializers/transport.py b/dotflow/core/serializers/transport.py index 97eaf3b7..dcf13810 100644 --- a/dotflow/core/serializers/transport.py +++ b/dotflow/core/serializers/transport.py @@ -1,5 +1,7 @@ """Transport serializer module""" +from __future__ import annotations + from pydantic import BaseModel, Field, model_validator from dotflow.core.serializers.task import SerializerTask diff --git a/dotflow/core/types/__init__.py b/dotflow/core/types/__init__.py index 1dafef52..8a27da59 100644 --- a/dotflow/core/types/__init__.py +++ b/dotflow/core/types/__init__.py @@ -1,6 +1,7 @@ """Types __init__ module.""" from dotflow.core.types.execution import TypeExecution +from dotflow.core.types.input_change import VALID_POLICIES, TypeInputChange from dotflow.core.types.overlap import TypeOverlap from dotflow.core.types.status import TypeStatus from dotflow.core.types.storage import TypeStorage @@ -8,8 +9,10 @@ __all__ = [ "TypeExecution", + "TypeInputChange", "TypeOverlap", "TypeStatus", "TypeStorage", + "VALID_POLICIES", "WorkflowStatus", ] diff --git a/dotflow/core/types/input_change.py b/dotflow/core/types/input_change.py new file mode 100644 index 00000000..aee13c17 --- /dev/null +++ b/dotflow/core/types/input_change.py @@ -0,0 +1,31 @@ +"""Type InputChange policy module.""" + +from typing import Annotated + +from typing_extensions import Doc + + +class TypeInputChange: + """ + Import: + You can import the **TypeInputChange** class with: + + from dotflow.core.types import TypeInputChange + """ + + REUSE: Annotated[ + str, Doc("Keep prior checkpoints; ignore the new input.") + ] = "reuse" + RESET: Annotated[str, Doc("Drop prior checkpoints and start fresh.")] = ( + "reset" + ) + RAISE: Annotated[str, Doc("Refuse to start; raise InputChangedError.")] = ( + "raise" + ) + + +VALID_POLICIES = ( + TypeInputChange.REUSE, + TypeInputChange.RESET, + TypeInputChange.RAISE, +) diff --git a/dotflow/core/workflow.py b/dotflow/core/workflow.py index 6caf2ff1..028bcad4 100644 --- a/dotflow/core/workflow.py +++ b/dotflow/core/workflow.py @@ -1,5 +1,7 @@ """Workflow module""" +from __future__ import annotations + import sys import threading from collections.abc import Callable @@ -11,9 +13,18 @@ from dotflow.abc.flow import Flow from dotflow.core.context import Context from dotflow.core.engine import TaskEngine -from dotflow.core.exception import ExecutionModeNotExist +from dotflow.core.exception import ( + ExecutionModeNotExist, + InputChangedError, + InvalidOnInputChange, +) +from dotflow.core.fingerprint import ( + fingerprint_of, + read_fingerprint, + write_fingerprint, +) from dotflow.core.task import Task, TaskError -from dotflow.core.types import TypeExecution, TypeStatus +from dotflow.core.types import VALID_POLICIES, TypeExecution, TypeStatus from dotflow.core.types.workflow import WorkflowStatus from dotflow.utils import basic_callback @@ -109,6 +120,8 @@ def __init__( keep_going: bool = False, workflow_id: UUID = None, resume: bool = False, + on_input_change: str = "reuse", + fingerprint: str | None = None, config=None, ) -> None: self.tasks = tasks @@ -118,6 +131,15 @@ def __init__( self.started = datetime.now() self.config = config + if on_input_change not in VALID_POLICIES: + raise InvalidOnInputChange(value=on_input_change) + + self.on_input_change = on_input_change + self.fingerprint = fingerprint + + if resume and self.config: + self._enforce_input_fingerprint(tasks=tasks) + if self.config: self.config.tracer.start_workflow( workflow_id=self.workflow_id, mode=mode, tasks_count=len(tasks) @@ -162,6 +184,45 @@ def _background_cleanup(): threading.Thread(target=_background_cleanup, daemon=True).start() + def _enforce_input_fingerprint(self, tasks: list[Task]) -> None: + storage = self.config.storage + workflow_key = str(self.workflow_id) + + if self.fingerprint is not None: + current_fp = self.fingerprint + else: + initial_payloads = [ + getattr(task.initial_context, "storage", None) + for task in tasks + ] + current_fp = fingerprint_of(initial_payloads) + + stored_fp = read_fingerprint(storage=storage, workflow_id=workflow_key) + + if stored_fp is None: + write_fingerprint( + storage=storage, + workflow_id=workflow_key, + value=current_fp, + ) + return + + if stored_fp == current_fp: + return + + if self.on_input_change == "reuse": + return + + if self.on_input_change == "raise": + raise InputChangedError(workflow_id=workflow_key) + + storage.clear(workflow_id=workflow_key) + write_fingerprint( + storage=storage, + workflow_id=workflow_key, + value=current_fp, + ) + def _callback_workflow(self, tasks: list[Task]): duration = (datetime.now() - self.started).total_seconds() final_status = [task.status for task in tasks] diff --git a/dotflow/providers/storage_default.py b/dotflow/providers/storage_default.py index c57e5111..efcbb259 100644 --- a/dotflow/providers/storage_default.py +++ b/dotflow/providers/storage_default.py @@ -20,3 +20,10 @@ def get(self, key: str) -> Context: def key(self, task: Callable) -> str: return f"{task.workflow_id}-{task.task_id}" + + def clear(self, workflow_id: str) -> None: + prefix = f"{workflow_id}-" + stale = [k for k in self._store if k.startswith(prefix)] + + for key in stale: + del self._store[key] diff --git a/dotflow/providers/storage_file.py b/dotflow/providers/storage_file.py index 94697725..b5eec108 100644 --- a/dotflow/providers/storage_file.py +++ b/dotflow/providers/storage_file.py @@ -59,6 +59,13 @@ def get(self, key: str) -> Context: def key(self, task: Callable): return f"{task.workflow_id}-{task.task_id}.json" + def clear(self, workflow_id: str) -> None: + prefix = f"{workflow_id}-" + + for entry in self.path.iterdir(): + if entry.is_file() and entry.name.startswith(prefix): + entry.unlink(missing_ok=True) + def _loads(self, storage: Any) -> Context: try: return Context(storage=loads(storage)) diff --git a/dotflow/providers/storage_gcs.py b/dotflow/providers/storage_gcs.py index 269c604b..e26b0541 100644 --- a/dotflow/providers/storage_gcs.py +++ b/dotflow/providers/storage_gcs.py @@ -78,6 +78,9 @@ def get(self, key: str) -> Context: def key(self, task: Callable): return f"{task.workflow_id}-{task.task_id}" + def clear(self, workflow_id: str) -> None: + self._gcs.delete_prefix(f"{workflow_id}-") + def _loads(self, storage: Any) -> Context: try: return Context(storage=loads(storage)) diff --git a/dotflow/providers/storage_s3.py b/dotflow/providers/storage_s3.py index 4589a27c..e4516727 100644 --- a/dotflow/providers/storage_s3.py +++ b/dotflow/providers/storage_s3.py @@ -78,6 +78,9 @@ def get(self, key: str) -> Context: def key(self, task: Callable): return f"{task.workflow_id}-{task.task_id}" + def clear(self, workflow_id: str) -> None: + self._s3.delete_prefix(f"{workflow_id}-") + def _loads(self, storage: Any) -> Context: try: return Context(storage=loads(storage)) diff --git a/examples b/examples index a893cb04..97a9ddea 160000 --- a/examples +++ b/examples @@ -1 +1 @@ -Subproject commit a893cb04bfe287c9cd1640aa475b3729422336c7 +Subproject commit 97a9ddea82f145e63258814c2cea0fbe90094389 diff --git a/poetry.lock b/poetry.lock index 86803768..edcd658e 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.2.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.3.4 and should not be changed by hand. [[package]] name = "aiofiles" @@ -528,6 +528,7 @@ files = [ {file = "boto3-1.42.84-py3-none-any.whl", hash = "sha256:4d03ad3211832484037337292586f71f48707141288d9ac23049c04204f4ab03"}, {file = "boto3-1.42.84.tar.gz", hash = "sha256:6a84b3293a5d8b3adf827a54588e7dcffcf0a85410d7dadca615544f97d27579"}, ] +markers = {main = "extra == \"aws\" or extra == \"deploy-aws\""} [package.dependencies] botocore = ">=1.42.84,<1.43.0" @@ -548,6 +549,7 @@ files = [ {file = "botocore-1.42.84-py3-none-any.whl", hash = "sha256:15f3fe07dfa6545e46a60c4b049fe2bdf63803c595ae4a4eec90e8f8172764f3"}, {file = "botocore-1.42.84.tar.gz", hash = "sha256:234064604c80d9272a5e9f6b3566d260bcaa053a5e05246db90d7eca1c2cf44b"}, ] +markers = {main = "extra == \"aws\" or extra == \"deploy-aws\""} [package.dependencies] jmespath = ">=0.7.1,<2.0.0" @@ -1404,6 +1406,22 @@ files = [ {file = "distlib-0.4.0.tar.gz", hash = "sha256:feec40075be03a04501a973d81f633735b4b69f98b05450592310c0f401a4e0d"}, ] +[[package]] +name = "eval-type-backport" +version = "0.2.2" +description = "Like `typing._eval_type`, but lets older Python versions use newer typing features." +optional = false +python-versions = ">=3.8" +groups = ["main"] +markers = "python_version < \"3.10\"" +files = [ + {file = "eval_type_backport-0.2.2-py3-none-any.whl", hash = "sha256:cb6ad7c393517f476f96d456d0412ea80f0a8cf96f6892834cd9340149111b0a"}, + {file = "eval_type_backport-0.2.2.tar.gz", hash = "sha256:f0576b4cf01ebb5bd358d02314d31846af5e07678387486e2c798af0e7d849c1"}, +] + +[package.extras] +tests = ["pytest"] + [[package]] name = "exceptiongroup" version = "1.3.1" @@ -2276,6 +2294,7 @@ files = [ {file = "jmespath-1.1.0-py3-none-any.whl", hash = "sha256:a5663118de4908c91729bea0acadca56526eb2698e83de10cd116ae0f4e97c64"}, {file = "jmespath-1.1.0.tar.gz", hash = "sha256:472c87d80f36026ae83c6ddd0f1d05d4e510134ed462851fd5f754c8c3cbb88d"}, ] +markers = {main = "extra == \"aws\" or extra == \"deploy-aws\""} [[package]] name = "librt" @@ -4485,12 +4504,13 @@ files = [ {file = "s3transfer-0.16.0-py3-none-any.whl", hash = "sha256:18e25d66fed509e3868dc1572b3f427ff947dd2c56f844a5bf09481ad3f3b2fe"}, {file = "s3transfer-0.16.0.tar.gz", hash = "sha256:8e990f13268025792229cd52fa10cb7163744bf56e719e0b9cb925ab79abf920"}, ] +markers = {main = "extra == \"aws\" or extra == \"deploy-aws\""} [package.dependencies] -botocore = ">=1.37.4,<2.0a.0" +botocore = ">=1.37.4,<2.0a0" [package.extras] -crt = ["botocore[crt] (>=1.37.4,<2.0a.0)"] +crt = ["botocore[crt] (>=1.37.4,<2.0a0)"] [[package]] name = "sentry-sdk" @@ -5267,4 +5287,4 @@ sentry = ["sentry-sdk"] [metadata] lock-version = "2.1" python-versions = ">=3.9.0" -content-hash = "245f8a624226db8553662055fe5067b465df0a4653837165d6521732cc951d92" +content-hash = "6eb85c7ad083150f524bff66f67397ecc638f3155e7edc729dc684e2fc86b7a5" diff --git a/pyproject.toml b/pyproject.toml index 595e3ca2..b56fb297 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "dotflow" -version = "1.0.0.dev5" +version = "1.0.0.dev6" authors = [ { name="Fernando Celmer", email="email@fernandocelmer.com" }, ] @@ -14,7 +14,8 @@ dependencies = [ "typing-extensions", "cookiecutter (>=2.0)", "requests", - "python-ulid (>=3.0)" + "python-ulid (>=3.0)", + "eval-type-backport ; python_version < \"3.10\"" ] keywords = [ "pipeline", @@ -67,7 +68,7 @@ sentry = ["sentry-sdk"] [tool.poetry] name = "dotflow" -version = "1.0.0.dev5" +version = "1.0.0.dev6" description = "🎲 Dotflow turns an idea into flow! Lightweight Python library for execution pipelines with retry, parallel, cron and async support." authors = ["Fernando Celmer "] readme = "README.md" @@ -119,6 +120,7 @@ python-dotenv = "^1.1.0" requests = "^2.32.4" cookiecutter = "^2.0" python-ulid = "^3.0.0" +eval-type-backport = { version = "^0.2", python = "<3.10" } [tool.poetry.group.dev.dependencies] build = "^1.2.2.post1" diff --git a/tests/core/test_fingerprint.py b/tests/core/test_fingerprint.py new file mode 100644 index 00000000..add698f0 --- /dev/null +++ b/tests/core/test_fingerprint.py @@ -0,0 +1,69 @@ +"""Test input fingerprint helpers.""" + +import unittest + +from dotflow.core.context import Context +from dotflow.core.fingerprint import ( + fingerprint_of, + fp_key, + read_fingerprint, + write_fingerprint, +) +from dotflow.providers.storage_default import StorageDefault + + +class TestFingerprintOf(unittest.TestCase): + def test_same_input_same_fingerprint(self): + a = fingerprint_of([{"k": 1, "z": 2}]) + b = fingerprint_of([{"z": 2, "k": 1}]) + + self.assertEqual(a, b) + + def test_different_input_different_fingerprint(self): + a = fingerprint_of([{"k": 1}]) + b = fingerprint_of([{"k": 2}]) + + self.assertNotEqual(a, b) + + def test_falls_back_to_repr_for_non_serializable(self): + class Custom: + def __repr__(self): + return "" + + a = fingerprint_of([Custom()]) + b = fingerprint_of([Custom()]) + + self.assertEqual(a, b) + self.assertEqual(len(a), 64) + + def test_none_payload_yields_stable_fingerprint(self): + a = fingerprint_of([None]) + b = fingerprint_of([None]) + + self.assertEqual(a, b) + + +class TestFingerprintStorage(unittest.TestCase): + def test_read_returns_none_when_absent(self): + storage = StorageDefault() + + self.assertIsNone(read_fingerprint(storage=storage, workflow_id="wf")) + + def test_write_then_read_roundtrip(self): + storage = StorageDefault() + write_fingerprint(storage=storage, workflow_id="wf", value="abc123") + + self.assertEqual( + read_fingerprint(storage=storage, workflow_id="wf"), + "abc123", + ) + + def test_fp_key_is_stable_per_workflow(self): + self.assertEqual(fp_key("wf"), fp_key("wf")) + self.assertNotEqual(fp_key("wf-a"), fp_key("wf-b")) + + def test_read_returns_none_for_non_string_storage(self): + storage = StorageDefault() + storage.post(key=fp_key("wf"), context=Context(storage={"not": "str"})) + + self.assertIsNone(read_fingerprint(storage=storage, workflow_id="wf")) diff --git a/tests/core/test_workflow_input_change.py b/tests/core/test_workflow_input_change.py new file mode 100644 index 00000000..caa50be8 --- /dev/null +++ b/tests/core/test_workflow_input_change.py @@ -0,0 +1,172 @@ +"""Tests for the on_input_change policy in Manager.""" + +import unittest +from uuid import uuid4 + +from dotflow import Config, DotFlow, action +from dotflow.core.exception import InputChangedError, InvalidOnInputChange +from dotflow.core.fingerprint import read_fingerprint +from dotflow.providers.storage_default import StorageDefault + + +@action +def step_one(initial_context): + return {"value": initial_context.storage["v"]} + + +def _build(config: Config, payload: dict, workflow_id): + workflow = DotFlow(config=config, workflow_id=workflow_id) + workflow.task.add(step=step_one, initial_context=payload) + return workflow + + +class TestOnInputChangePolicy(unittest.TestCase): + def test_invalid_policy_raises(self): + config = Config(storage=StorageDefault()) + workflow = _build(config, {"v": 1}, str(uuid4())) + + with self.assertRaises(InvalidOnInputChange): + workflow.start(resume=True, on_input_change="bogus") + + def test_first_run_records_fingerprint(self): + config = Config(storage=StorageDefault()) + workflow_id = str(uuid4()) + workflow = _build(config, {"v": 1}, workflow_id) + + workflow.start(resume=True) + + self.assertIsNotNone( + read_fingerprint(storage=config.storage, workflow_id=workflow_id) + ) + + def test_same_input_keeps_same_fingerprint(self): + config = Config(storage=StorageDefault()) + workflow_id = str(uuid4()) + + _build(config, {"v": 1}, workflow_id).start(resume=True) + first_fp = read_fingerprint( + storage=config.storage, + workflow_id=workflow_id, + ) + + _build(config, {"v": 1}, workflow_id).start(resume=True) + second_fp = read_fingerprint( + storage=config.storage, + workflow_id=workflow_id, + ) + + self.assertEqual(first_fp, second_fp) + + def test_changed_input_with_raise_policy_throws(self): + config = Config(storage=StorageDefault()) + workflow_id = str(uuid4()) + + _build(config, {"v": 1}, workflow_id).start(resume=True) + + with self.assertRaises(InputChangedError): + _build(config, {"v": 2}, workflow_id).start( + resume=True, + on_input_change="raise", + ) + + def test_changed_input_with_reset_policy_replaces_fingerprint(self): + config = Config(storage=StorageDefault()) + workflow_id = str(uuid4()) + + _build(config, {"v": 1}, workflow_id).start(resume=True) + first_fp = read_fingerprint( + storage=config.storage, + workflow_id=workflow_id, + ) + + _build(config, {"v": 2}, workflow_id).start( + resume=True, + on_input_change="reset", + ) + second_fp = read_fingerprint( + storage=config.storage, + workflow_id=workflow_id, + ) + + self.assertNotEqual(first_fp, second_fp) + + def test_changed_input_with_reuse_policy_keeps_old_fingerprint(self): + config = Config(storage=StorageDefault()) + workflow_id = str(uuid4()) + + _build(config, {"v": 1}, workflow_id).start(resume=True) + first_fp = read_fingerprint( + storage=config.storage, + workflow_id=workflow_id, + ) + + _build(config, {"v": 2}, workflow_id).start( + resume=True, + on_input_change="reuse", + ) + second_fp = read_fingerprint( + storage=config.storage, + workflow_id=workflow_id, + ) + + self.assertEqual(first_fp, second_fp) + + def test_no_resume_skips_fingerprint(self): + config = Config(storage=StorageDefault()) + workflow_id = str(uuid4()) + + _build(config, {"v": 1}, workflow_id).start(resume=False) + + self.assertIsNone( + read_fingerprint(storage=config.storage, workflow_id=workflow_id) + ) + + def test_explicit_fingerprint_overrides_payload_hash(self): + config = Config(storage=StorageDefault()) + workflow_id = str(uuid4()) + + _build(config, {"v": 1}, workflow_id).start( + resume=True, + fingerprint="custom-fp", + ) + + self.assertEqual( + read_fingerprint(storage=config.storage, workflow_id=workflow_id), + "custom-fp", + ) + + def test_explicit_fingerprint_makes_payload_changes_invisible(self): + config = Config(storage=StorageDefault()) + workflow_id = str(uuid4()) + + _build(config, {"v": 1}, workflow_id).start( + resume=True, + fingerprint="stable", + ) + + _build(config, {"v": 2}, workflow_id).start( + resume=True, + fingerprint="stable", + on_input_change="raise", + ) + + self.assertEqual( + read_fingerprint(storage=config.storage, workflow_id=workflow_id), + "stable", + ) + + def test_explicit_fingerprint_change_triggers_raise(self): + config = Config(storage=StorageDefault()) + workflow_id = str(uuid4()) + + _build(config, {"v": 1}, workflow_id).start( + resume=True, + fingerprint="fp-a", + ) + + with self.assertRaises(InputChangedError): + _build(config, {"v": 1}, workflow_id).start( + resume=True, + fingerprint="fp-b", + on_input_change="raise", + ) diff --git a/tests/providers/test_storage_default.py b/tests/providers/test_storage_default.py index 2eaa8d4c..b8f216ee 100644 --- a/tests/providers/test_storage_default.py +++ b/tests/providers/test_storage_default.py @@ -66,3 +66,16 @@ def test_post_and_get_with_task(self): self.assertIsInstance(result, Context) self.assertEqual(result.storage, "flow") + + def test_clear_removes_only_matching_workflow(self): + storage = StorageDefault() + + storage.post(key="wf-A-task-1", context=Context(storage="a")) + storage.post(key="wf-A-task-2", context=Context(storage="b")) + storage.post(key="wf-B-task-1", context=Context(storage="c")) + + storage.clear(workflow_id="wf-A") + + self.assertIsNone(storage.get(key="wf-A-task-1").storage) + self.assertIsNone(storage.get(key="wf-A-task-2").storage) + self.assertEqual(storage.get(key="wf-B-task-1").storage, "c") diff --git a/tests/providers/test_storage_file.py b/tests/providers/test_storage_file.py index 088117c5..51648cff 100644 --- a/tests/providers/test_storage_file.py +++ b/tests/providers/test_storage_file.py @@ -171,3 +171,16 @@ def test_key(self): self.assertEqual( result, f"{workflow_id}-01ARZ3NDEKTSV4RRFFQ69G5FAV.json" ) + + def test_clear_removes_only_matching_workflow(self): + storage = StorageFile(path=self.path) + + storage.post(key="wf-A-task-1.json", context=Context(storage="a")) + storage.post(key="wf-A-task-2.json", context=Context(storage="b")) + storage.post(key="wf-B-task-1.json", context=Context(storage="c")) + + storage.clear(workflow_id="wf-A") + + self.assertFalse(storage.path.joinpath("wf-A-task-1.json").exists()) + self.assertFalse(storage.path.joinpath("wf-A-task-2.json").exists()) + self.assertTrue(storage.path.joinpath("wf-B-task-1.json").exists()) diff --git a/tests/providers/test_storage_s3.py b/tests/providers/test_storage_s3.py index 8da9303b..c761c979 100644 --- a/tests/providers/test_storage_s3.py +++ b/tests/providers/test_storage_s3.py @@ -135,3 +135,25 @@ def test_key(self): result = storage.key(task=task) self.assertEqual(result, f"{workflow_id}-01ARZ3NDEKTSV4RRFFQ69G5FAV") + + def test_clear_removes_only_matching_workflow(self): + storage = StorageS3(bucket=BUCKET, prefix=PREFIX, region=REGION) + + storage.post(key="wf-A-task-1", context=Context(storage="a")) + storage.post(key="wf-A-task-2", context=Context(storage="b")) + storage.post(key="wf-B-task-1", context=Context(storage="c")) + + storage.clear(workflow_id="wf-A") + + listing = self.conn.list_objects_v2(Bucket=BUCKET, Prefix=PREFIX) + keys = {obj["Key"] for obj in listing.get("Contents", [])} + + self.assertNotIn(f"{PREFIX}wf-A-task-1", keys) + self.assertNotIn(f"{PREFIX}wf-A-task-2", keys) + self.assertIn(f"{PREFIX}wf-B-task-1", keys) + + def test_delete_prefix_rejects_empty(self): + storage = StorageS3(bucket=BUCKET, prefix=PREFIX, region=REGION) + + with self.assertRaises(ValueError): + storage._s3.delete_prefix("")