fix guard and make stop repeat configurable

This commit is contained in:
K. Hodges 2026-05-20 05:02:32 -07:00
parent 2f2146f47d
commit c12493a248
8 changed files with 193 additions and 9 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.7 MiB

View File

@ -78,6 +78,7 @@ class PipelineConfig:
max_task_retries: int
stages: tuple[StageConfig, ...]
continue_on_task_failure: bool = False
stop_on_repeated_failure_signature_after: int | None = None
@dataclass(frozen=True)
@ -265,6 +266,14 @@ def parse_config(raw: dict[str, Any], config_path: Path) -> NightShiftConfig:
pipeline_raw.get("continue_on_task_failure", False),
"pipeline.continue_on_task_failure",
)
stop_on_repeated_failure_signature_after = _optional_int_or_none(
pipeline_raw.get("stop_on_repeated_failure_signature_after"),
"pipeline.stop_on_repeated_failure_signature_after",
)
if stop_on_repeated_failure_signature_after is not None and stop_on_repeated_failure_signature_after < 2:
raise ConfigError(
"Config error: pipeline.stop_on_repeated_failure_signature_after must be two or greater."
)
stages_raw = pipeline_raw.get("stages")
if not isinstance(stages_raw, list) or not stages_raw:
@ -396,6 +405,7 @@ def parse_config(raw: dict[str, Any], config_path: Path) -> NightShiftConfig:
max_task_retries=max_task_retries,
stages=tuple(stages),
continue_on_task_failure=continue_on_task_failure,
stop_on_repeated_failure_signature_after=stop_on_repeated_failure_signature_after,
),
experiment=experiment,
)

View File

@ -14,14 +14,32 @@ class EscalationDecision:
reason: str
def evaluate_retry_churn(entries: tuple[RetryMemoryEntry, ...], *, retry_budget: int) -> EscalationDecision:
def evaluate_retry_churn(
entries: tuple[RetryMemoryEntry, ...],
*,
retry_budget: int,
repeated_signature_after: int | None = None,
) -> EscalationDecision:
if len(entries) < 2:
return EscalationDecision(False, "continue", "Not enough retry history for churn detection.")
recent = entries[-3:]
same_stage = len({entry.stage_id for entry in recent}) == 1
same_cause = len({entry.cause for entry in recent}) == 1
recent_signatures = [entry.failure_signature for entry in entries[-2:] if entry.failure_signature]
same_signature = len(recent_signatures) == 2 and len(set(recent_signatures)) == 1
if len(entries) >= retry_budget and retry_budget > 0:
return EscalationDecision(True, "human review", "Configured retry budget is exhausted.")
if (
repeated_signature_after is not None
and repeated_signature_after > 0
and len(entries) >= repeated_signature_after
and same_signature
):
return EscalationDecision(
True,
"debugger review or larger model",
"The same failure signature repeated on consecutive retries.",
)
if len(recent) == 3 and same_stage and same_cause:
return EscalationDecision(True, "debugger review or larger model", "The same stage is failing with the same reason repeatedly.")
return EscalationDecision(False, "continue", "No retry churn detected.")

View File

@ -34,6 +34,8 @@ def classify_failure(output: str, exit_code: int | None = None, modified_files:
text = output or ""
lowered = text.lower()
failing_tests = extract_failing_tests(text)
exception_name = _extract_exception_name(text)
source_path, _ = _extract_traceback_location(text)
missing = re.search(r"No module named ['\"]([^'\"]+)['\"]", text, re.IGNORECASE)
if not missing:
@ -48,6 +50,25 @@ def classify_failure(output: str, exit_code: int | None = None, modified_files:
"do not retry implementation until dependency is resolved",
failing_tests,
)
if exception_name and source_path and _looks_like_project_source(source_path):
if exception_name in {"TypeError", "AttributeError"}:
return FailureClassification(
"API misuse",
f"The implementation is calling an API with an incompatible shape near `{source_path}`.",
0.82,
"Retry implementation with the exception and relevant call site.",
"retry implementation",
failing_tests,
)
if exception_name in {"NameError", "OperationalError", "KeyError", "ValueError", "IndexError"}:
return FailureClassification(
"logic bug",
f"The failure originates in project code near `{source_path}`.",
0.8,
"Send the traceback and touched files back to the implementer.",
"retry implementation",
failing_tests,
)
if re.search(r"\b(syntaxerror|indentationerror|importerror)\b", text, re.IGNORECASE):
return FailureClassification(
"syntax/import error",
@ -113,6 +134,15 @@ def classify_failure(output: str, exit_code: int | None = None, modified_files:
)
def build_failure_signature(output: str, reason: str = "") -> str:
text = "\n".join(part for part in (reason, output) if part)
command = _extract_command(text)
exception_name = _extract_exception_name(text)
source_path, source_line = _extract_traceback_location(text)
parts = [part for part in (exception_name, source_path, source_line, command) if part]
return " | ".join(parts) if parts else "unknown-failure"
def extract_failing_tests(output: str) -> tuple[str, ...]:
tests: list[str] = []
patterns = (
@ -128,6 +158,56 @@ def extract_failing_tests(output: str) -> tuple[str, ...]:
return tuple(tests)
def _extract_exception_name(text: str) -> str:
candidates = []
for match in re.finditer(r"(?m)^(?:E\s+)?([A-Za-z0-9_.]+(?:Error|Exception|Warning|NameError|TypeError|AttributeError|KeyError|ValueError|IndexError)):\s*(.*)$", text):
candidates.append(match.group(1))
return candidates[-1] if candidates else ""
def _extract_traceback_location(text: str) -> tuple[str, str]:
candidates: list[tuple[int, str, str]] = []
for match in re.finditer(r'(?m)^\s*File "([^"]+)", line (\d+), in .+$', text):
path = match.group(1)
line = match.group(2)
candidates.append((_traceback_score(path), path, line))
for match in re.finditer(r"(?m)^.*?([A-Za-z]:[\\/][^:\n]+?\.py):(\d+):", text):
path = match.group(1)
line = match.group(2)
candidates.append((_traceback_score(path), path, line))
if not candidates:
return "", ""
candidates.sort(key=lambda item: item[0], reverse=True)
_, path, line = candidates[0]
return path, line
def _extract_command(text: str) -> str:
candidates = []
for match in re.finditer(r"Command:\s*`([^`]+)`", text):
candidates.append(match.group(1))
return candidates[-1] if candidates else ""
def _looks_like_project_source(path: str) -> bool:
normalized = path.replace("\\", "/").lower()
return "/src/" in normalized or "/tests/" in normalized
def _traceback_score(path: str) -> int:
normalized = path.replace("\\", "/").lower()
score = 0
if normalized.endswith(".py"):
score += 1
if "/src/" in normalized:
score += 10
if "/tests/" in normalized:
score += 8
if "/site-packages/" in normalized or "/_pytest/" in normalized:
score -= 20
return score
def format_failure_classification(result: FailureClassification, *, exit_code: int | None, modified_files: tuple[str, ...]) -> str:
files = "\n".join(f"- `{path}`" for path in modified_files) or "- None"
tests = "\n".join(f"- `{name}`" for name in result.failing_tests) or "- None"

View File

@ -16,7 +16,7 @@ from .dependencies import diagnose_python_dependencies, format_dependency_diagno
from .escalation import evaluate_retry_churn, format_escalation_decision
from .errors import PipelineError
from .errors import NightShiftError
from .failures import classify_failure, format_failure_classification
from .failures import build_failure_signature, classify_failure, format_failure_classification
from .git import ensure_clean_worktree, write_diff_artifact, write_git_artifacts
from .patches import (
DEFAULT_FORBIDDEN_PATHS,
@ -232,7 +232,16 @@ class PipelineRunner:
)
break
retry_count += 1
memory_entry = entry_from_stage(retry_count, result, target_stage)
output = self._read_output(result.output_path)
failure_signature = ""
if stage.type in COMMAND_STAGE_TYPES:
failure_signature = build_failure_signature(output, result.reason)
memory_entry = entry_from_stage(
retry_count,
result,
target_stage,
failure_signature=failure_signature,
)
retry_memory.append(memory_entry)
self.artifacts.write_stage_output(
task.id,
@ -242,6 +251,8 @@ class PipelineRunner:
decision = evaluate_retry_churn(
tuple(retry_memory),
retry_budget=self.config.pipeline.max_task_retries + 1,
repeated_signature_after=self.config.pipeline.stop_on_repeated_failure_signature_after
or self.config.pipeline.max_task_retries,
)
self.artifacts.write_stage_output(
task.id,

View File

@ -14,6 +14,7 @@ class RetryMemoryEntry:
status: str
cause: str
next_stage: str
failure_signature: str
def summarize_retry_memory(entries: tuple[RetryMemoryEntry, ...]) -> str:
@ -23,17 +24,24 @@ def summarize_retry_memory(entries: tuple[RetryMemoryEntry, ...]) -> str:
for entry in entries[-8:]:
lines.append(
f"- Attempt {entry.attempt}: `{entry.stage_id}` returned {entry.status}; "
f"cause: {entry.cause}; next: `{entry.next_stage}`"
f"cause: {entry.cause}; signature: `{entry.failure_signature}`; next: `{entry.next_stage}`"
)
lines.append("")
return "\n".join(lines)
def entry_from_stage(attempt: int, result: StageResult, next_stage: str) -> RetryMemoryEntry:
def entry_from_stage(
attempt: int,
result: StageResult,
next_stage: str,
*,
failure_signature: str,
) -> RetryMemoryEntry:
return RetryMemoryEntry(
attempt=attempt,
stage_id=result.stage_id,
status=result.status,
cause=result.reason,
next_stage=next_stage,
failure_signature=failure_signature,
)

View File

@ -3,10 +3,10 @@
from __future__ import annotations
PACKAGE_VERSION = "0.2.4"
PACKAGE_VERSION = "0.2.5"
RELEASE_CHANNEL = "alpha"
hotdog_version = "bratwurst"
topping_version = "relish"
hotdog_version = "chicago"
topping_version = "onions"
HOTDOG_VERSIONS = (
"bratwurst",

View File

@ -5,10 +5,12 @@ import unittest
from nightshift.artifacts import ArtifactStore
from nightshift.config import parse_config, StageConfig
from nightshift.failures import classify_failure
from nightshift.escalation import evaluate_retry_churn
from nightshift.failures import build_failure_signature, classify_failure
from nightshift.integ import cleanup_integration_runs, create_integration_run
from nightshift.patches import validate_patch
from nightshift.pipeline import PipelineRunner
from nightshift.retry_memory import RetryMemoryEntry
from nightshift.tasks import parse_tasks
from tests.test_pipeline import TASK_MD, make_config, _write_common_files
@ -36,6 +38,61 @@ class ReliabilityFeatureTests(unittest.TestCase):
self.assertEqual(result.category, "missing dependency")
self.assertIn("pastebin_app", result.probable_root_cause)
def test_failure_classifier_treats_traceback_into_source_as_logic_bug(self) -> None:
result = classify_failure(
"\n".join(
[
' File "C:\\repo\\project\\src\\pastebin_app\\app.py", line 31, in get_db',
" if 'db' not in g:",
"NameError: name 'g' is not defined",
]
),
exit_code=1,
)
self.assertEqual(result.category, "logic bug")
self.assertIn("src\\pastebin_app\\app.py", result.probable_root_cause)
def test_retry_churn_stops_on_repeated_failure_signature(self) -> None:
entries = (
RetryMemoryEntry(
attempt=1,
stage_id="test",
status="fail",
cause="Command exited with code 1: python -m pytest -q",
next_stage="implement",
failure_signature="NameError | src/pastebin_app/app.py | 31 | python -m pytest -q",
),
RetryMemoryEntry(
attempt=2,
stage_id="test",
status="fail",
cause="Command exited with code 1: python -m pytest -q",
next_stage="implement",
failure_signature="NameError | src/pastebin_app/app.py | 31 | python -m pytest -q",
),
)
decision = evaluate_retry_churn(entries, retry_budget=4, repeated_signature_after=2)
self.assertTrue(decision.should_stop)
self.assertIn("same failure signature", decision.reason)
def test_build_failure_signature_prefers_project_traceback_over_pytest_cache(self) -> None:
signature = build_failure_signature(
"\n".join(
[
' File "C:\\repo\\project\\src\\pastebin_app\\app.py", line 31, in get_db',
"NameError: name 'g' is not defined",
' File "C:\\Users\\metis\\...\\site-packages\\_pytest\\cacheprovider.py", line 429, in set',
]
),
reason="Command exited with code 1: python -m pytest -q",
)
self.assertIn("src\\pastebin_app\\app.py", signature)
self.assertNotIn("_pytest\\cacheprovider.py", signature)
def test_command_failure_writes_diagnostics_and_retry_memory(self) -> None:
with tempfile.TemporaryDirectory() as directory:
root = Path(directory)