Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
6909441
⚙️ FEATURE-#281: Add InputChangedError and InvalidOnInputChange excep…
FernandoCelmer Apr 27, 2026
cdf7f67
⚙️ FEATURE-#281: Add Storage.clear(workflow_id) to ABC for bulk check…
FernandoCelmer Apr 27, 2026
b163569
⚙️ FEATURE-#281: Implement clear() in StorageDefault
FernandoCelmer Apr 27, 2026
69f26b1
⚙️ FEATURE-#281: Implement clear() in StorageFile
FernandoCelmer Apr 27, 2026
4e29886
⚙️ FEATURE-#281: Add S3.delete_prefix helper for bulk checkpoint dele…
FernandoCelmer Apr 27, 2026
dfeca14
⚙️ FEATURE-#281: Implement clear() in StorageS3 via delete_prefix
FernandoCelmer Apr 27, 2026
d7cdad7
⚙️ FEATURE-#281: Add fingerprint module with hashing and storage helpers
FernandoCelmer Apr 27, 2026
bbc1a55
⚙️ FEATURE-#281: Wire on_input_change policy into Manager
FernandoCelmer Apr 27, 2026
897f463
❤️ TEST-#281: Cover StorageDefault.clear
FernandoCelmer Apr 27, 2026
8cf9758
❤️ TEST-#281: Cover StorageFile.clear
FernandoCelmer Apr 27, 2026
6935499
❤️ TEST-#281: Cover StorageS3.clear and delete_prefix guard
FernandoCelmer Apr 27, 2026
6cee9b0
❤️ TEST-#281: Cover fingerprint helpers
FernandoCelmer Apr 27, 2026
49c09c5
❤️ TEST-#281: Cover on_input_change reuse, reset, raise policies
FernandoCelmer Apr 27, 2026
567c30b
📘 DOCS-#281: Add checkpoint_input_change example
FernandoCelmer Apr 27, 2026
9817fe0
📦 PyPI: Update version to 1.0.0.dev6
actions-user Apr 27, 2026
3353ea2
🪲 BUG-#281: Add __future__ annotations to fix Python 3.9 collection w…
FernandoCelmer Apr 27, 2026
132eba6
📦 PyPI-#281: Add eval_type_backport for Python 3.9 PEP 604 evaluation
FernandoCelmer Apr 27, 2026
2c2535c
📦 PyPI-#281: Regenerate poetry.lock
FernandoCelmer Apr 27, 2026
a67f8b0
📝 PEP8-#281: Fix ruff I001 import order on workflow.py and test_finge…
FernandoCelmer Apr 27, 2026
68127ee
🪲 BUG-#281: Drop json.dumps(default=str) so non-serializable inputs r…
FernandoCelmer Apr 27, 2026
f1de40d
❤️ TEST-#281: Assert repr fallback is stable across calls
FernandoCelmer Apr 27, 2026
2c0c34e
🪲 BUG-#281: Anchor clear() prefix with hyphen separator across providers
FernandoCelmer Apr 27, 2026
85bf225
❤️ TEST-#281: Cover clear() prefix anchoring in StorageDefault
FernandoCelmer Apr 27, 2026
d764f67
⚙️ FEATURE-#281: Add GCS.delete_prefix helper for bulk checkpoint del…
FernandoCelmer Apr 27, 2026
9d4e81d
⚙️ FEATURE-#281: Make Storage.clear abstract so providers must implem…
FernandoCelmer Apr 27, 2026
d6374fc
📝 PEP8-#281: Drop quoted TaskBuilder annotation (redundant with __fut…
FernandoCelmer Apr 27, 2026
90b70fc
⚙️ FEATURE-#281: Move VALID_POLICIES into TypeInputChange module unde…
FernandoCelmer Apr 27, 2026
009d158
♻️ REFACTOR-#281: Drop VALID_POLICIES from fingerprint module
FernandoCelmer Apr 27, 2026
e06de76
⚙️ FEATURE-#281: Accept explicit fingerprint kwarg overriding payload…
FernandoCelmer Apr 27, 2026
da7eec3
❤️ TEST-#281: Cover explicit fingerprint kwarg behavior
FernandoCelmer Apr 27, 2026
5ce5cd7
📘 DOCS-#281: Show explicit fingerprint usage in example
FernandoCelmer Apr 27, 2026
2e69adc
📝 PEP8-#281: Apply ruff format across touched files
FernandoCelmer Apr 27, 2026
a56763e
🔗 CHORE-#281: Bump examples submodule
FernandoCelmer May 2, 2026
87aa544
🪲 BUG-#282: Resolve merge conflict with develop
FernandoCelmer May 2, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion LAST_VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.0.0.dev5
1.0.0.dev6
38 changes: 38 additions & 0 deletions docs_src/checkpoint/checkpoint_input_change.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from hashlib import sha256
from uuid import UUID

from dotflow import Config, DotFlow, action
from dotflow.providers import StorageFile


@action
def step_one(initial_context):
return {"loaded": initial_context.storage}


@action
def step_two(previous_context):
return {"transformed": previous_context.storage}


config = Config(storage=StorageFile())


def main():
payload = {"s3_key": "uploads/data.zip", "v": 1}

workflow = DotFlow(
config=config,
workflow_id=UUID("12345678-1234-5678-1234-567812345678"),
)

workflow.task.add(step=[step_one, step_two], initial_context=payload)

fp = sha256(payload["s3_key"].encode()).hexdigest()
workflow.start(resume=True, on_input_change="reset", fingerprint=fp)

return workflow


if __name__ == "__main__":
main()
2 changes: 1 addition & 1 deletion dotflow/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Dotflow __init__ module."""

__version__ = "1.0.0.dev5"
__version__ = "1.0.0.dev6"
__description__ = "🎲 Dotflow turns an idea into flow!"

from .core.action import Action as action
Expand Down
2 changes: 2 additions & 0 deletions dotflow/abc/flow.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Flow ABC"""

from __future__ import annotations

from abc import ABC, abstractmethod
from uuid import UUID

Expand Down
8 changes: 8 additions & 0 deletions dotflow/abc/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,11 @@ def get(self, key: str) -> Context:
@abstractmethod
def key(self, task: Callable):
"""Function that returns a key to get and post storage"""

@abstractmethod
def clear(self, workflow_id: str) -> None:
"""Remove every persisted entry under ``workflow_id``.

Comment thread
FernandoCelmer marked this conversation as resolved.
Used by the input-fingerprint reset path when
``on_input_change='reset'``.
"""
23 changes: 23 additions & 0 deletions dotflow/cloud/aws/services/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,26 @@ def write(self, key: str, data: list) -> None:
Body=dumps(data),
ContentType="application/json",
)

def delete_prefix(self, sub_prefix: str) -> None:
"""Delete every object whose key starts with prefix + sub_prefix.

Empty ``sub_prefix`` is rejected to avoid accidentally wiping
the entire bucket prefix.
"""
if not sub_prefix:
raise ValueError("delete_prefix requires a non-empty sub_prefix")

full_prefix = f"{self.prefix}{sub_prefix}"
paginator = self._s3.get_paginator("list_objects_v2")

for page in paginator.paginate(Bucket=self.bucket, Prefix=full_prefix):
objects = [
{"Key": item["Key"]} for item in page.get("Contents", [])
]
if not objects:
continue
self._s3.delete_objects(
Bucket=self.bucket,
Delete={"Objects": objects},
)
14 changes: 14 additions & 0 deletions dotflow/cloud/gcp/services/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,17 @@ def write(self, key: str, data: list) -> None:
dumps(data),
content_type="application/json",
)

def delete_prefix(self, sub_prefix: str) -> None:
"""Delete every blob whose name starts with prefix + sub_prefix.

Empty ``sub_prefix`` is rejected to avoid accidentally wiping
the entire bucket prefix.
"""
if not sub_prefix:
raise ValueError("delete_prefix requires a non-empty sub_prefix")

full_prefix = f"{self.prefix}{sub_prefix}"

for blob in self._client.list_blobs(self._bucket, prefix=full_prefix):
blob.delete()
2 changes: 2 additions & 0 deletions dotflow/core/engine.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""TaskEngine module"""

from __future__ import annotations

import re
from collections.abc import Callable
from concurrent.futures import (
Expand Down
18 changes: 18 additions & 0 deletions dotflow/core/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@
MESSAGE_WORKFLOW_FLAG_CONFLICT = (
"{flag} is only valid with --step and cannot be used with --workflow."
)
MESSAGE_INPUT_CHANGED = (
"Workflow '{workflow_id}' was previously executed with a different "
"initial_context. Pass on_input_change='reset' to discard prior "
"checkpoints, or 'reuse' to ignore the new input."
)
MESSAGE_INVALID_ON_INPUT_CHANGE = (
"on_input_change must be one of 'reuse', 'reset', 'raise'; got '{value}'."
)


class MissingActionDecorator(Exception):
Expand Down Expand Up @@ -72,6 +80,16 @@ def __init__(self):
super().__init__("Unknown")


class InputChangedError(Exception):
def __init__(self, workflow_id: str):
super().__init__(MESSAGE_INPUT_CHANGED.format(workflow_id=workflow_id))


class InvalidOnInputChange(Exception):
def __init__(self, value: str):
super().__init__(MESSAGE_INVALID_ON_INPUT_CHANGE.format(value=value))


class TaskError:
def __init__(self, error: Exception = None, attempt: int = None) -> None:
self.attempt = attempt
Expand Down
45 changes: 45 additions & 0 deletions dotflow/core/fingerprint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
"""Input fingerprint helpers for resume + on_input_change policy."""

from __future__ import annotations

from hashlib import sha256
from json import dumps
from typing import Any

from dotflow.core.context import Context

FP_KEY_SUFFIX = "-_input_fp"


def fingerprint_of(values: list[Any]) -> str:
"""SHA256 of JSON-serialized list of initial_context payloads.

Inputs must be JSON-serializable (primitives, lists, dicts).
Non-serializable objects fall back to ``repr``, which is **not**
guaranteed to be stable across processes — objects without a custom
``__repr__`` will produce different fingerprints on every run because
the default repr embeds the memory address.
"""
try:
encoded = dumps(values, sort_keys=True)
except (TypeError, ValueError):
Comment thread
FernandoCelmer marked this conversation as resolved.
Comment thread
FernandoCelmer marked this conversation as resolved.
encoded = repr(values)

return sha256(encoded.encode("utf-8")).hexdigest()


def fp_key(workflow_id: str) -> str:
return f"{workflow_id}{FP_KEY_SUFFIX}"


def read_fingerprint(storage, workflow_id: str) -> str | None:
context = storage.get(key=fp_key(workflow_id))

if isinstance(context, Context) and isinstance(context.storage, str):
return context.storage

return None


def write_fingerprint(storage, workflow_id: str, value: str) -> None:
storage.post(key=fp_key(workflow_id), context=Context(storage=value))
2 changes: 2 additions & 0 deletions dotflow/core/serializers/transport.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Transport serializer module"""

from __future__ import annotations

from pydantic import BaseModel, Field, model_validator

from dotflow.core.serializers.task import SerializerTask
Expand Down
3 changes: 3 additions & 0 deletions dotflow/core/types/__init__.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
"""Types __init__ module."""

from dotflow.core.types.execution import TypeExecution
from dotflow.core.types.input_change import VALID_POLICIES, TypeInputChange
from dotflow.core.types.overlap import TypeOverlap
from dotflow.core.types.status import TypeStatus
from dotflow.core.types.storage import TypeStorage
from dotflow.core.types.workflow import WorkflowStatus

__all__ = [
"TypeExecution",
"TypeInputChange",
"TypeOverlap",
"TypeStatus",
"TypeStorage",
"VALID_POLICIES",
"WorkflowStatus",
]
31 changes: 31 additions & 0 deletions dotflow/core/types/input_change.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""Type InputChange policy module."""

from typing import Annotated

from typing_extensions import Doc


class TypeInputChange:
"""
Import:
You can import the **TypeInputChange** class with:

from dotflow.core.types import TypeInputChange
"""

REUSE: Annotated[
str, Doc("Keep prior checkpoints; ignore the new input.")
] = "reuse"
RESET: Annotated[str, Doc("Drop prior checkpoints and start fresh.")] = (
"reset"
)
RAISE: Annotated[str, Doc("Refuse to start; raise InputChangedError.")] = (
"raise"
)


VALID_POLICIES = (
TypeInputChange.REUSE,
TypeInputChange.RESET,
TypeInputChange.RAISE,
)
65 changes: 63 additions & 2 deletions dotflow/core/workflow.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Workflow module"""

from __future__ import annotations

import sys
import threading
from collections.abc import Callable
Expand All @@ -11,9 +13,18 @@
from dotflow.abc.flow import Flow
from dotflow.core.context import Context
from dotflow.core.engine import TaskEngine
from dotflow.core.exception import ExecutionModeNotExist
from dotflow.core.exception import (
ExecutionModeNotExist,
InputChangedError,
InvalidOnInputChange,
)
from dotflow.core.fingerprint import (
fingerprint_of,
read_fingerprint,
write_fingerprint,
)
from dotflow.core.task import Task, TaskError
from dotflow.core.types import TypeExecution, TypeStatus
from dotflow.core.types import VALID_POLICIES, TypeExecution, TypeStatus
from dotflow.core.types.workflow import WorkflowStatus
from dotflow.utils import basic_callback

Expand Down Expand Up @@ -109,6 +120,8 @@ def __init__(
keep_going: bool = False,
workflow_id: UUID = None,
resume: bool = False,
on_input_change: str = "reuse",
fingerprint: str | None = None,
config=None,
) -> None:
self.tasks = tasks
Expand All @@ -118,6 +131,15 @@ def __init__(
self.started = datetime.now()
self.config = config

if on_input_change not in VALID_POLICIES:
raise InvalidOnInputChange(value=on_input_change)

self.on_input_change = on_input_change
self.fingerprint = fingerprint

if resume and self.config:
self._enforce_input_fingerprint(tasks=tasks)

if self.config:
self.config.tracer.start_workflow(
workflow_id=self.workflow_id, mode=mode, tasks_count=len(tasks)
Expand Down Expand Up @@ -162,6 +184,45 @@ def _background_cleanup():

threading.Thread(target=_background_cleanup, daemon=True).start()

def _enforce_input_fingerprint(self, tasks: list[Task]) -> None:
storage = self.config.storage
workflow_key = str(self.workflow_id)

if self.fingerprint is not None:
current_fp = self.fingerprint
else:
initial_payloads = [
getattr(task.initial_context, "storage", None)
for task in tasks
]
current_fp = fingerprint_of(initial_payloads)

stored_fp = read_fingerprint(storage=storage, workflow_id=workflow_key)

if stored_fp is None:
write_fingerprint(
storage=storage,
workflow_id=workflow_key,
value=current_fp,
)
return

if stored_fp == current_fp:
return

if self.on_input_change == "reuse":
return

if self.on_input_change == "raise":
raise InputChangedError(workflow_id=workflow_key)

storage.clear(workflow_id=workflow_key)
write_fingerprint(
storage=storage,
workflow_id=workflow_key,
value=current_fp,
)

def _callback_workflow(self, tasks: list[Task]):
duration = (datetime.now() - self.started).total_seconds()
final_status = [task.status for task in tasks]
Expand Down
7 changes: 7 additions & 0 deletions dotflow/providers/storage_default.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,10 @@ def get(self, key: str) -> Context:

def key(self, task: Callable) -> str:
return f"{task.workflow_id}-{task.task_id}"

Comment thread
FernandoCelmer marked this conversation as resolved.
def clear(self, workflow_id: str) -> None:
prefix = f"{workflow_id}-"
stale = [k for k in self._store if k.startswith(prefix)]

for key in stale:
del self._store[key]
7 changes: 7 additions & 0 deletions dotflow/providers/storage_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ def get(self, key: str) -> Context:
def key(self, task: Callable):
return f"{task.workflow_id}-{task.task_id}.json"

def clear(self, workflow_id: str) -> None:
prefix = f"{workflow_id}-"

Comment thread
FernandoCelmer marked this conversation as resolved.
for entry in self.path.iterdir():
if entry.is_file() and entry.name.startswith(prefix):
entry.unlink(missing_ok=True)

def _loads(self, storage: Any) -> Context:
try:
return Context(storage=loads(storage))
Expand Down
3 changes: 3 additions & 0 deletions dotflow/providers/storage_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ def get(self, key: str) -> Context:
def key(self, task: Callable):
return f"{task.workflow_id}-{task.task_id}"

def clear(self, workflow_id: str) -> None:
self._gcs.delete_prefix(f"{workflow_id}-")

def _loads(self, storage: Any) -> Context:
try:
return Context(storage=loads(storage))
Expand Down
Loading
Loading