nightshift/nightshift/pipeline.py
RJS 67ea77baa6 Replace on_pass with on_status dict for per-status stage routing
on_status replaces the single on_pass field with a mapping that routes
each review status (pass/fail/retry/escalate) to a different target
stage. The lookup order for non-pass statuses is:
  on_status[status] -> on_fail -> next_stage (agent output)

Config parsing validates that on_status keys are valid status names
and all referenced stages exist. Includes test coverage for parsing,
validation errors, pass/fail/escalate routing, and on_fail fallback.
2026-05-23 14:43:50 +00:00

2137 lines
83 KiB
Python

"""Deterministic pipeline runner."""
from __future__ import annotations
from dataclasses import dataclass, replace
import json
from pathlib import Path
import re
import subprocess
from .agents import AgentExecutor
from .artifacts import ArtifactStore
from .commands import CommandExecutor, extract_test_file_paths, render_command_template
from .config import COMMAND_STAGE_TYPES, NightShiftConfig, StageConfig
from .context import ContextManager
from .dependencies import diagnose_python_dependencies, format_dependency_diagnostic
from .escalation import evaluate_retry_churn, format_escalation_decision
from .errors import PipelineError
from .errors import NightShiftError
from .failures import build_failure_signature, classify_failure, format_failure_classification
from .git import ensure_clean_worktree, write_diff_artifact, write_git_artifacts
from .patches import (
DEFAULT_FORBIDDEN_PATHS,
DEFAULT_MAX_CHANGED_LINES,
DEFAULT_MAX_FILES,
FileUpdate,
apply_patch_with_git,
extract_unified_diff,
format_patch_apply_result,
format_validation_result,
generate_patch_from_file_updates,
normalize_patch_text,
parse_file_updates,
validate_patch,
)
from .project_chart import build_project_context_chart
from .reports import ReportGenerator
from .repo_tools import RepoTools, extract_agent_stdout, parse_lookup_requests
from .resources import format_resource_report, parse_resource_requests, satisfy_resource_requests
from .retry_memory import RetryMemoryEntry, entry_from_stage, summarize_retry_memory
from .semantic_index import (
build_semantic_index,
format_search_results,
format_semantic_index,
search_index,
)
from .runlog import RunLogger
from .stages import StageResult
from .tasks import Task, mark_task_completed
from .telemetry import TelemetryEntry, format_telemetry_summary, telemetry_from_stage_output
from .writing_validators import collect_writing_warnings, validate_writing_file_updates
@dataclass(frozen=True)
class PipelineResult:
task_id: str
status: str
retry_count: int
stage_results: tuple[StageResult, ...]
artifact_dir: str
reason: str
@dataclass(frozen=True)
class MultiTaskResult:
status: str
task_results: tuple[PipelineResult, ...]
completed_count: int
failed_count: int
reason: str
class PipelineRunner:
"""Execute configured stages for one task."""
def __init__(
self,
config: NightShiftConfig,
artifacts: ArtifactStore | None = None,
agent_timeout_seconds: int = 600,
command_timeout_seconds: int = 300,
logger: RunLogger | None = None,
) -> None:
self.config = config
self.artifacts = artifacts or ArtifactStore.from_config(config)
self.logger = logger or RunLogger()
self.context = ContextManager(self.artifacts)
self.reports = ReportGenerator(
config.project.root,
self.artifacts,
experiment_label=config.experiment.label,
prompt_variant=config.experiment.prompt_variant,
)
self.agent_executor = AgentExecutor(
config.project.root,
config.agents,
self.artifacts,
timeout_seconds=agent_timeout_seconds,
logger=self.logger,
)
self.command_executor = CommandExecutor(
config.project.root,
config.safety,
self.artifacts,
timeout_seconds=command_timeout_seconds,
logger=self.logger,
)
self.repo_tools = RepoTools(
config.project.root,
config.safety,
self.artifacts,
logger=self.logger,
)
def run_task(self, task: Task) -> PipelineResult:
ensure_clean_worktree(self.config.project.root, self.config.safety.require_clean_worktree)
self.artifacts.initialize_run()
self.logger.bind(self.artifacts)
self.logger.event(
"task.start",
"Starting task",
run_id=self.artifacts.run_id,
task_id=task.id,
task_title=task.title,
)
self.artifacts.write_config_snapshot(self.config.path)
self.artifacts.write_prompt_snapshots(
{
agent_id: self.config.project.root / agent.system_prompt
for agent_id, agent in self.config.agents.items()
}
)
self.artifacts.write_run_metadata(format_run_metadata(self.config))
self.artifacts.write_task_snapshot(task)
self._write_project_context_chart()
write_git_artifacts(self.artifacts, task.id, "before")
self.context.ensure_project_context()
self.context.create_task_context(task)
stages = list(self.config.pipeline.stages)
stage_indexes = {stage.id: index for index, stage in enumerate(stages)}
stage_results: list[StageResult] = []
previous_outputs: dict[str, str] = {}
retry_notes: list[str] = []
retry_memory: list[RetryMemoryEntry] = []
telemetry_entries: list[TelemetryEntry] = []
retry_count = 0
index = 0
final_status = "complete"
final_reason = "Pipeline completed."
preflight_result = self._preflight_task(task, stages)
if preflight_result:
stage_results.append(preflight_result)
final_status = "failed"
final_reason = preflight_result.reason
index = len(stages)
while index < len(stages):
stage = stages[index]
self.logger.event(
"stage.start",
"Starting stage",
run_id=self.artifacts.run_id,
task_id=task.id,
stage_id=stage.id,
stage_type=stage.type,
retry_count=retry_count,
)
try:
result = self._run_stage(stage, task, previous_outputs, retry_notes, retry_count)
except NightShiftError as exc:
result = StageResult(
stage_id=stage.id,
status="fail",
reason=str(exc),
)
except OSError as exc:
result = StageResult(
stage_id=stage.id,
status="fail",
reason=f"Unexpected OS error while running stage: {exc}",
)
stage_results.append(result)
if stage.id in previous_outputs:
del previous_outputs[stage.id]
previous_outputs[stage.id] = self._read_context_output(result.output_path)
telemetry_entries.append(self._telemetry_entry(stage, result, retry_count))
self._write_telemetry(task.id, telemetry_entries)
self.logger.event(
"stage.finish",
"Finished stage",
run_id=self.artifacts.run_id,
task_id=task.id,
stage_id=stage.id,
status=result.status,
reason=result.reason,
artifact_path=result.output_path,
)
if result.context_update:
retry_notes.append(f"Context update from '{stage.id}': {result.context_update}")
if result.status == "pass":
if stage.on_status and "pass" in stage.on_status:
target = stage.on_status["pass"]
if target not in stage_indexes:
final_status = "failed"
final_reason = (
f"Stage '{stage.id}' on_status.pass references unknown stage '{target}'."
)
break
self.logger.event(
"stage.next",
"Jumping via on_status.pass",
run_id=self.artifacts.run_id,
task_id=task.id,
stage_id=stage.id,
next_stage=target,
)
index = stage_indexes[target]
continue
if stage.type in {"agent_review", "review"}:
if result.next_stage:
self.logger.event(
"stage.next_ignored",
"Ignoring next_stage from passing review",
run_id=self.artifacts.run_id,
task_id=task.id,
stage_id=stage.id,
requested_next_stage=result.next_stage,
)
index += 1
continue
if result.next_stage:
if result.next_stage not in stage_indexes:
final_status = "failed"
final_reason = (
f"Stage '{stage.id}' requested unknown next stage '{result.next_stage}'."
)
break
self.logger.event(
"stage.next",
"Jumping to requested next stage",
run_id=self.artifacts.run_id,
task_id=task.id,
stage_id=stage.id,
next_stage=result.next_stage,
)
index = stage_indexes[result.next_stage]
continue
index += 1
continue
target_stage = _resolve_retry_target_stage(stage, result)
analysis_note = self._write_failure_diagnostics(stage, task, result, retry_count)
if analysis_note:
retry_notes.append(analysis_note)
debugger_note = self._run_debugger_if_configured(task, result, retry_notes)
if debugger_note:
retry_notes.append(debugger_note)
if target_stage:
if retry_count >= self.config.pipeline.max_task_retries:
final_status = "failed"
final_reason = (
f"Retry limit reached after stage '{stage.id}': {result.reason}"
)
break
if target_stage not in stage_indexes:
final_status = "failed"
final_reason = (
f"Stage '{stage.id}' requested unknown next stage '{target_stage}'."
)
break
retry_count += 1
output = self._read_output(result.output_path)
failure_signature = ""
if stage.type in COMMAND_STAGE_TYPES:
failure_signature = build_failure_signature(output, result.reason)
memory_entry = entry_from_stage(
retry_count,
result,
target_stage,
failure_signature=failure_signature,
)
retry_memory.append(memory_entry)
self.artifacts.write_stage_output(
task.id,
"retry-memory.md",
summarize_retry_memory(tuple(retry_memory)),
)
if _repeated_protected_path_violation(tuple(retry_memory)):
final_status = "failed"
final_reason = (
"Escalation policy stopped retries: implementation repeatedly "
"attempted to modify paths outside the stage allowlist."
)
break
decision = evaluate_retry_churn(
tuple(retry_memory),
retry_budget=self.config.pipeline.max_task_retries + 1,
repeated_signature_after=self.config.pipeline.stop_on_repeated_failure_signature_after,
)
self.artifacts.write_stage_output(
task.id,
"escalation-policy.md",
format_escalation_decision(decision),
)
if decision.should_stop:
final_status = "failed"
final_reason = f"Escalation policy stopped retries: {decision.reason}"
break
self.logger.event(
"stage.retry",
"Redirecting after stage result",
run_id=self.artifacts.run_id,
task_id=task.id,
stage_id=stage.id,
status=result.status,
retry_count=retry_count,
next_stage=target_stage,
)
retry_notes.append(self._format_retry_note(retry_count, stage, result, target_stage))
index = stage_indexes[target_stage]
continue
final_status = "failed"
final_reason = f"Stage '{stage.id}' returned {result.status}: {result.reason}"
break
context_out_path = self.context.write_context_out(
task,
final_status,
final_reason,
retry_notes,
durable_notes=[
result.context_update
for result in stage_results
if result.context_update
],
)
completion_changed = False
if final_status == "complete":
completion_changed = mark_task_completed(
self.config.project.root,
self.config.project.task_file,
task.id,
)
self.artifacts.write_stage_output(
task.id,
"task-completion.md",
format_task_completion(task, final_status, completion_changed),
)
write_git_artifacts(self.artifacts, task.id, "after")
write_diff_artifact(self.artifacts, task.id)
self.reports.write_reports(
task,
final_status,
final_reason,
retry_count,
stage_results,
context_out_path=context_out_path,
)
self._write_telemetry(task.id, telemetry_entries)
self.logger.event(
"task.finish",
"Finished task",
run_id=self.artifacts.run_id,
task_id=task.id,
status=final_status,
retry_count=retry_count,
reason=final_reason,
artifact_path=self.artifacts.create_task_dir(task.id).directory.relative_to(self.config.project.root),
)
return PipelineResult(
task_id=task.id,
status=final_status,
retry_count=retry_count,
stage_results=tuple(stage_results),
artifact_dir=str(self.artifacts.create_task_dir(task.id).directory.relative_to(self.config.project.root)),
reason=final_reason,
)
def _preflight_task(self, task: Task, stages: list[StageConfig]) -> StageResult | None:
missing_paths: list[str] = []
for stage in stages:
if stage.type not in COMMAND_STAGE_TYPES:
continue
for command in stage.commands:
rendered = render_command_template(command, task.id)
for path_text in extract_test_file_paths(rendered):
if not (self.config.project.root / path_text).exists():
missing_paths.append(path_text)
if not missing_paths:
return None
unique_paths = tuple(dict.fromkeys(missing_paths))
details = "\n".join(f"- `{path}`" for path in unique_paths)
output_path = self.artifacts.write_stage_output(
task.id,
"preflight.md",
"\n".join(
[
"# Task Preflight",
"",
"Status: fail",
"Reason: configured task test file is missing.",
"",
"## Missing Files",
"",
details,
"",
]
),
)
return StageResult(
"preflight",
"fail",
"Task preflight failed: configured task test file is missing: "
+ ", ".join(unique_paths),
output_path=str(output_path.relative_to(self.config.project.root)),
)
def run_tasks(self, tasks: list[Task] | tuple[Task, ...]) -> MultiTaskResult:
self.artifacts.initialize_run()
self.logger.bind(self.artifacts)
self.logger.event("run.start", "Starting multi-task run", run_id=self.artifacts.run_id)
results: list[PipelineResult] = []
known_ids = {task.id for task in tasks}
completed_ids = {task.id for task in tasks if task.completed}
for task in tasks:
if task.completed:
continue
missing_refs = [dependency for dependency in task.dependencies if dependency not in known_ids]
incomplete_deps = [
dependency for dependency in task.dependencies if dependency in known_ids and dependency not in completed_ids
]
if missing_refs or incomplete_deps:
reason_parts = []
if missing_refs:
reason_parts.append(f"missing dependencies: {', '.join(missing_refs)}")
if incomplete_deps:
reason_parts.append(f"incomplete dependencies: {', '.join(incomplete_deps)}")
blocked = PipelineResult(
task_id=task.id,
status="blocked",
retry_count=0,
stage_results=(),
artifact_dir="",
reason="Task blocked by " + "; ".join(reason_parts),
)
results.append(blocked)
self.logger.event(
"task.blocked",
"Task blocked by dependencies",
run_id=self.artifacts.run_id,
task_id=task.id,
reason=blocked.reason,
)
if not self.config.pipeline.continue_on_task_failure:
break
continue
result = self.run_task(task)
results.append(result)
if result.status == "complete":
completed_ids.add(task.id)
if result.status != "complete" and not self.config.pipeline.continue_on_task_failure:
break
completed_count = sum(1 for result in results if result.status == "complete")
failed_count = sum(1 for result in results if result.status != "complete")
status = "complete" if failed_count == 0 else "failed"
reason = "All selected tasks completed." if status == "complete" else "One or more tasks failed."
self.artifacts.run_summary_path.write_text(
format_aggregate_run_summary(results, status, reason),
encoding="utf-8",
)
self.logger.event(
"run.finish",
"Finished multi-task run",
run_id=self.artifacts.run_id,
status=status,
completed_count=completed_count,
failed_count=failed_count,
)
return MultiTaskResult(
status=status,
task_results=tuple(results),
completed_count=completed_count,
failed_count=failed_count,
reason=reason,
)
def _run_stage(
self,
stage: StageConfig,
task: Task,
previous_outputs: dict[str, str],
retry_notes: list[str],
retry_count: int = 0,
) -> StageResult:
if stage.type in {"agent", "agent_review", "review"}:
context = self.context.read_context(task, retry_notes)
result = self.agent_executor.run_stage(
self._stage_for_retry_agent(stage, retry_count),
task,
_review_previous_outputs(previous_outputs) if stage.type in {"agent_review", "review"} else previous_outputs,
retry_notes,
project_context=context.project_context,
task_context=context.task_context,
retry_context=context.retry_context,
)
if stage.type == "agent":
resource_result = self._maybe_satisfy_resource_request(stage, task, result)
if resource_result is not None:
return resource_result
return self._maybe_rerun_agent_with_repo_lookup(
stage,
task,
result,
previous_outputs,
retry_notes,
context.project_context,
context.task_context,
context.retry_context,
)
if stage.type in {"agent_review", "review"} and _is_malformed_review_result(result):
return self._rerun_malformed_review(
stage,
task,
result,
previous_outputs,
retry_notes,
retry_count,
context.project_context,
context.task_context,
)
return result
if stage.type in COMMAND_STAGE_TYPES:
return self.command_executor.run_stage(_stage_with_attempt_output(stage, retry_count), task.id)
if stage.type == "code_writer":
return self._run_code_writer_stage(stage, task, previous_outputs, retry_notes, retry_count)
if stage.type == "file_writer":
return self._run_file_writer_stage(stage, task, previous_outputs, retry_notes, retry_count)
if stage.type == "patch_normalizer":
return self._run_patch_normalizer_stage(stage, task, previous_outputs, retry_notes, retry_count)
if stage.type == "patch_validator":
return self._run_patch_validator_stage(stage, task, previous_outputs, retry_count)
if stage.type == "patch_apply":
return self._run_patch_apply_stage(stage, task, previous_outputs, retry_count)
if stage.type == "repo_context":
output_path = self.artifacts.write_stage_output(
task.id,
stage.output or "context-pack.md",
self._build_context_pack(task),
)
self.logger.event(
"artifact.write",
"Wrote context pack",
stage_id=stage.id,
task_id=task.id,
artifact_path=output_path.relative_to(self.config.project.root),
)
return StageResult(
stage_id=stage.id,
status="pass",
reason="Context pack written.",
output_path=str(output_path.relative_to(self.config.project.root)),
)
if stage.type == "semantic_context":
index = build_semantic_index(self.config.project.root, self.config.safety)
index_path = self.artifacts.write_stage_output(
task.id,
"semantic-index.md",
format_semantic_index(index),
)
query = " ".join([task.title, task.description, *task.acceptance_criteria])
results = _task_semantic_results(index, query, task.id)
context_path = self.artifacts.write_stage_output(
task.id,
stage.output or "semantic-context.md",
format_search_results(results, query),
)
self.logger.event(
"artifact.write",
"Wrote semantic context",
stage_id=stage.id,
task_id=task.id,
artifact_path=context_path.relative_to(self.config.project.root),
)
return StageResult(
stage_id=stage.id,
status="pass",
reason="Semantic context written.",
output_path=str(context_path.relative_to(self.config.project.root)),
context_update=f"Semantic index: {index_path.relative_to(self.config.project.root).as_posix()}",
)
if stage.type == "summarize":
output_path = self.artifacts.write_stage_output(
task.id,
stage.output or "final-notes.md",
format_summary_stage(task, previous_outputs, retry_notes),
)
return StageResult(
stage_id=stage.id,
status="pass",
reason="Summary written.",
output_path=str(output_path.relative_to(self.config.project.root)),
)
raise PipelineError(f"Pipeline error: unsupported stage type '{stage.type}'.")
def _write_project_context_chart(self) -> Path:
chart = build_project_context_chart(self.config.project.root, self.config.safety)
self.artifacts.initialize_run()
self.artifacts.project_context_chart_path.write_text(chart, encoding="utf-8")
self.logger.event(
"artifact.write",
"Wrote project context chart",
artifact_path=self.artifacts.project_context_chart_path.relative_to(self.config.project.root),
)
return self.artifacts.project_context_chart_path
def _run_code_writer_stage(
self,
stage: StageConfig,
task: Task,
previous_outputs: dict[str, str],
retry_notes: list[str],
retry_count: int = 0,
) -> StageResult:
if stage.agent is None:
raise PipelineError(f"Pipeline error: code_writer stage '{stage.id}' must reference an agent.")
enriched_outputs = dict(previous_outputs)
context_pack_path = self._latest_task_artifact(task.id, "context-pack.md")
if context_pack_path is not None:
enriched_outputs["context-pack.md"] = context_pack_path.read_text(encoding="utf-8", errors="replace")
chart_path = self.artifacts.project_context_chart_path
if chart_path.exists():
enriched_outputs["project-context-chart.md"] = chart_path.read_text(encoding="utf-8", errors="replace")
context = self.context.read_context(task, retry_notes)
agent_stage = self._writer_agent_stage(stage, retry_count)
result = self.agent_executor.run_stage(
agent_stage,
task,
enriched_outputs,
retry_notes,
project_context=context.project_context,
task_context=context.task_context,
retry_context=context.retry_context,
)
stdout = self._read_agent_stdout(result.output_path)
lookup_requests = parse_lookup_requests(stdout)
if lookup_requests and "diff --git " not in stdout:
lookup_context = self.repo_tools.execute_requests(
task.id,
lookup_requests,
filename="implementation-files-inspected.md",
)
self.logger.event(
"agent.rerun",
"Re-running code writer with repo lookup context",
stage_id=stage.id,
task_id=task.id,
lookup_count=len(lookup_requests),
)
rerun_outputs = dict(enriched_outputs)
rerun_outputs["repo_lookup_results"] = lookup_context
rerun_notes = [
*retry_notes,
"Repository lookup results have been provided. Return the unified diff now; do not request more lookups.",
]
result = self.agent_executor.run_stage(
agent_stage,
task,
rerun_outputs,
rerun_notes,
project_context=context.project_context,
task_context=context.task_context,
retry_context="\n".join(f"- {note}" for note in rerun_notes),
)
stdout = self._read_agent_stdout(result.output_path)
try:
patch = extract_unified_diff(stdout)
except PipelineError as exc:
self.artifacts.write_stage_output(
task.id,
"implementation-summary.md",
f"# Implementation Summary\n\nStatus: fail\nReason: {exc}\n",
)
return StageResult(stage.id, "fail", str(exc), output_path=result.output_path)
patch_filename = _writer_patch_filename(stage, retry_count)
summary_filename = _writer_summary_filename(stage, retry_count)
proposed_path = self.artifacts.write_stage_output(task.id, patch_filename, patch)
summary_path = self.artifacts.write_stage_output(
task.id,
summary_filename,
format_implementation_summary(
stage.id,
proposed_path.relative_to(self.config.project.root).as_posix(),
retry_count=retry_count,
retry_notes=retry_notes,
),
)
self.logger.event(
"artifact.write",
"Wrote proposed patch",
stage_id=stage.id,
task_id=task.id,
artifact_path=proposed_path.relative_to(self.config.project.root),
)
return StageResult(
stage.id,
"pass",
"Proposed patch written.",
output_path=str(proposed_path.relative_to(self.config.project.root)),
context_update=f"Implementation summary: {summary_path.relative_to(self.config.project.root).as_posix()}",
)
def _run_file_writer_stage(
self,
stage: StageConfig,
task: Task,
previous_outputs: dict[str, str],
retry_notes: list[str],
retry_count: int = 0,
) -> StageResult:
if stage.agent is None:
raise PipelineError(f"Pipeline error: file_writer stage '{stage.id}' must reference an agent.")
if _is_state_update_stage(stage):
enriched_outputs = _state_update_previous_outputs(previous_outputs)
allowed_file_contents = self._allowed_file_contents(stage)
if allowed_file_contents:
enriched_outputs["current_allowed_files"] = allowed_file_contents
elif _is_scene_edit_stage(stage):
enriched_outputs = _file_writer_previous_outputs(previous_outputs, retry_count)
current_scene = self._task_scene_file_contents(task)
if current_scene:
enriched_outputs["current_scene_file"] = current_scene
else:
enriched_outputs = _file_writer_previous_outputs(previous_outputs, retry_count)
context_pack_path = self._latest_task_artifact(task.id, "context-pack.md")
if context_pack_path is not None:
enriched_outputs["context-pack.md"] = context_pack_path.read_text(encoding="utf-8", errors="replace")
chart_path = self.artifacts.project_context_chart_path
if chart_path.exists():
enriched_outputs["project-context-chart.md"] = chart_path.read_text(encoding="utf-8", errors="replace")
context = self.context.read_context(task, retry_notes)
agent_stage = self._writer_agent_stage(stage, retry_count)
result = self.agent_executor.run_stage(
agent_stage,
task,
enriched_outputs,
retry_notes,
project_context=context.project_context,
task_context=context.task_context,
retry_context=context.retry_context,
)
stdout = self._read_agent_stdout(result.output_path)
lookup_requests = parse_lookup_requests(stdout)
if lookup_requests and "```file:" not in stdout.lower() and "```path:" not in stdout.lower():
lookup_context = self.repo_tools.execute_requests(
task.id,
lookup_requests,
filename="implementation-files-inspected.md",
)
self.logger.event(
"agent.rerun",
"Re-running file writer with repo lookup context",
stage_id=stage.id,
task_id=task.id,
lookup_count=len(lookup_requests),
)
rerun_outputs = dict(enriched_outputs)
rerun_outputs["repo_lookup_results"] = lookup_context
rerun_notes = [
*retry_notes,
"Repository lookup results have been provided. Return complete file blocks now; do not request more lookups.",
]
result = self.agent_executor.run_stage(
agent_stage,
task,
rerun_outputs,
rerun_notes,
project_context=context.project_context,
task_context=context.task_context,
retry_context="\n".join(f"- {note}" for note in rerun_notes),
)
stdout = self._read_agent_stdout(result.output_path)
invalid_rerun_done = False
candidate_index_path: Path | None = None
warning_path: Path | None = None
while True:
updates: tuple[FileUpdate, ...] = ()
try:
updates = parse_file_updates(stdout)
candidate_index_path = self._write_file_writer_candidates(
task.id,
stage,
updates,
retry_count,
)
if _is_writing_file_writer_stage(stage):
validate_writing_file_updates(updates, self.config.project.root)
warning_path = self._write_file_writer_warnings(task.id, stage, updates, retry_count)
patch = generate_patch_from_file_updates(
updates,
self.config.project.root,
self.config.safety,
allowed_paths=stage.allowed_paths,
forbidden_paths=stage.forbidden_paths or DEFAULT_FORBIDDEN_PATHS,
)
patch_reason = "Deterministic patch written from file blocks."
log_message = "Wrote deterministic patch from file blocks"
break
except PipelineError as exc:
reason = _file_writer_error_reason(stage, str(exc))
allowed_updates = _filter_allowed_file_updates(updates, stage)
if (
allowed_updates
and len(allowed_updates) < len(updates)
and "not allowed for this stage" in str(exc)
):
if _is_writing_file_writer_stage(stage):
validate_writing_file_updates(allowed_updates, self.config.project.root)
warning_path = self._write_file_writer_warnings(
task.id,
stage,
allowed_updates,
retry_count,
)
patch = generate_patch_from_file_updates(
allowed_updates,
self.config.project.root,
self.config.safety,
allowed_paths=stage.allowed_paths,
forbidden_paths=stage.forbidden_paths or DEFAULT_FORBIDDEN_PATHS,
)
patch_reason = "Deterministic patch written from allowed file blocks; disallowed file blocks were ignored."
log_message = "Wrote deterministic patch from allowed file blocks"
self.logger.event(
"file_writer.disallowed_blocks_ignored",
"Ignored disallowed file blocks from file writer output",
stage_id=stage.id,
task_id=task.id,
)
break
if (
"no file blocks found" in str(exc)
and "diff --git " not in stdout
and not invalid_rerun_done
):
invalid_rerun_done = True
self.logger.event(
"agent.rerun",
"Re-running file writer after invalid output",
stage_id=stage.id,
task_id=task.id,
)
rerun_outputs = dict(enriched_outputs)
rerun_outputs["invalid_file_writer_output_summary"] = _invalid_file_writer_output_summary(
stdout,
reason,
)
strict_notes = [
*retry_notes,
"Previous file_writer output was invalid. Return complete file blocks now. Do not output lookup_requests, prose, or 'lookup failed'.",
_file_writer_repair_format_note(stage),
]
result = self.agent_executor.run_stage(
agent_stage,
task,
rerun_outputs,
strict_notes,
project_context=context.project_context,
task_context=context.task_context,
retry_context="\n".join(f"- {note}" for note in strict_notes),
)
stdout = self._read_agent_stdout(result.output_path)
continue
try:
patch = normalize_patch_text(stdout)
except PipelineError:
summary_filename = _writer_summary_filename(stage, retry_count)
if "generated patch has no changes" in reason:
next_stage = self._stage_after_patch_flow(stage.id)
reason = self._no_changes_reason(retry_count)
summary_path = self.artifacts.write_stage_output(
task.id,
summary_filename,
f"# Implementation Summary\n\nStatus: pass\nReason: {reason}\n",
)
return StageResult(
stage.id,
"pass",
reason,
output_path=result.output_path,
next_stage=next_stage,
context_update=(
f"Implementation summary: "
f"{summary_path.relative_to(self.config.project.root).as_posix()}"
),
)
self.artifacts.write_stage_output(
task.id,
summary_filename,
f"# Implementation Summary\n\nStatus: fail\nReason: {reason}\n",
)
return StageResult(stage.id, "fail", reason, output_path=result.output_path)
patch_reason = "Fallback patch written from unified diff output."
log_message = "Wrote fallback patch from unified diff output"
break
patch_filename = _writer_patch_filename(stage, retry_count)
summary_filename = _writer_summary_filename(stage, retry_count)
proposed_path = self.artifacts.write_stage_output(task.id, patch_filename, patch)
summary_path = self.artifacts.write_stage_output(
task.id,
summary_filename,
format_implementation_summary(
stage.id,
proposed_path.relative_to(self.config.project.root).as_posix(),
retry_count=retry_count,
retry_notes=retry_notes,
candidate_index_path=(
candidate_index_path.relative_to(self.config.project.root).as_posix()
if candidate_index_path
else None
),
),
)
self.logger.event(
"artifact.write",
log_message,
stage_id=stage.id,
task_id=task.id,
artifact_path=proposed_path.relative_to(self.config.project.root),
)
return StageResult(
stage.id,
"pass",
patch_reason,
output_path=str(proposed_path.relative_to(self.config.project.root)),
context_update=_format_writer_context_update(
self.config.project.root,
summary_path,
warning_path,
),
)
def _write_file_writer_candidates(
self,
task_id: str,
stage: StageConfig,
updates: tuple[FileUpdate, ...],
retry_count: int,
) -> Path:
if not updates:
raise PipelineError("File writer error: no candidate file blocks found.")
base = f"candidate-files/{stage.id}"
if retry_count:
base += f"-retry-{retry_count}"
lines = [
"# Candidate Files",
"",
f"Stage: `{stage.id}`",
f"Retry: {retry_count}",
"",
"These are raw file blocks extracted before patch validation or apply.",
"",
"## Files",
"",
]
seen: set[str] = set()
for index, update in enumerate(updates, start=1):
filename = f"{index:03d}-{_candidate_artifact_name(update.path)}"
while filename in seen:
filename = f"{index:03d}-{len(seen):03d}-{_candidate_artifact_name(update.path)}"
seen.add(filename)
artifact_name = f"{base}/{filename}"
artifact_path = self.artifacts.write_stage_output(task_id, artifact_name, update.content)
relative = artifact_path.relative_to(self.config.project.root).as_posix()
lines.extend(
[
f"- Source path: `{update.path}`",
f" Artifact: `{relative}`",
]
)
lines.append("")
return self.artifacts.write_stage_output(task_id, f"{base}/index.md", "\n".join(lines))
def _write_file_writer_warnings(
self,
task_id: str,
stage: StageConfig,
updates: tuple[FileUpdate, ...],
retry_count: int,
) -> Path | None:
warnings = collect_writing_warnings(updates, self.config.project.root)
if not warnings:
return None
filename = _attempt_filename(f"{stage.id}-warnings.md", retry_count)
lines = [
"# Writing Warnings",
"",
f"Stage: `{stage.id}`",
"",
"These are soft writing concerns. They do not block artifact creation.",
"",
"## Warnings",
"",
*[f"- {warning}" for warning in warnings],
"",
]
return self.artifacts.write_stage_output(task_id, filename, "\n".join(lines))
def _allowed_file_contents(self, stage: StageConfig, max_chars: int = 2400) -> str:
sections: list[str] = []
for path_text in stage.allowed_paths:
path = self.config.project.root / path_text
if not path.is_file():
continue
content = path.read_text(encoding="utf-8", errors="replace")
sections.extend(
[
f"## {path_text}",
"",
"```text",
_compact_previous_output(content, max_chars=max_chars).rstrip(),
"```",
"",
]
)
return "\n".join(sections).strip()
def _task_scene_file_contents(self, task: Task, max_chars: int = 10000) -> str:
sections: list[str] = []
for path_text in _task_story_chapter_paths(task):
path = self.config.project.root / path_text
if not path.is_file():
continue
content = path.read_text(encoding="utf-8", errors="replace")
sections.extend(
[
f"## {path_text}",
"",
"```text",
_compact_previous_output(content, max_chars=max_chars).rstrip(),
"```",
"",
]
)
return "\n".join(sections).strip()
def _writer_agent_stage(self, stage: StageConfig, retry_count: int) -> StageConfig:
suffix = f"-{retry_count}" if retry_count else ""
return replace(
self._stage_for_retry_agent(stage, retry_count),
output=f"{stage.id}-agent-output{suffix}.md",
)
def _stage_after_patch_flow(self, current_stage_id: str) -> str | None:
stages = list(self.config.pipeline.stages)
stage_indexes = {stage.id: index for index, stage in enumerate(stages)}
start = stage_indexes.get(current_stage_id)
if start is None:
return None
patch_stage_types = {"patch_normalizer", "patch_validator", "patch_apply"}
for stage in stages[start + 1:]:
if stage.type in patch_stage_types:
continue
return stage.id
return None
def _no_changes_reason(self, retry_count: int) -> str:
if retry_count:
return (
"File writer produced no changes relative to the current workspace. "
"The previous patch may already be applied; skipping patch stages and "
"continuing with verification."
)
return (
"File writer produced no changes relative to the current workspace. "
"The task may already be applied locally; skipping patch stages and "
"continuing with verification."
)
def _run_patch_normalizer_stage(
self,
stage: StageConfig,
task: Task,
previous_outputs: dict[str, str],
retry_notes: list[str],
retry_count: int = 0,
) -> StageResult:
source = _latest_patch_like_output(previous_outputs)
if stage.agent is not None:
result = self.agent_executor.run_stage(
stage,
task,
{"patch_input": source, **previous_outputs},
retry_notes,
project_context=self.context.read_context(task, retry_notes).project_context,
task_context=self.context.read_context(task, retry_notes).task_context,
retry_context=self.context.read_context(task, retry_notes).retry_context,
)
source = self._read_agent_stdout(result.output_path)
try:
patch = normalize_patch_text(source)
except PipelineError as exc:
return StageResult(stage.id, "fail", str(exc))
output_path = self.artifacts.write_stage_output(
task.id,
_attempt_filename(stage.output or "normalized.patch", retry_count),
patch,
)
self.logger.event(
"artifact.write",
"Wrote normalized patch",
stage_id=stage.id,
task_id=task.id,
artifact_path=output_path.relative_to(self.config.project.root),
)
return StageResult(
stage.id,
"pass",
"Normalized patch written.",
output_path=str(output_path.relative_to(self.config.project.root)),
)
def _run_patch_validator_stage(
self,
stage: StageConfig,
task: Task,
previous_outputs: dict[str, str],
retry_count: int = 0,
) -> StageResult:
output_filename = _attempt_filename(stage.output or "patch-validation.md", retry_count)
source = _latest_patch_like_output(previous_outputs)
try:
patch = normalize_patch_text(source)
result = validate_patch(
patch,
self.config.project.root,
self.config.safety,
max_files=stage.max_files or DEFAULT_MAX_FILES,
max_changed_lines=stage.max_lines or DEFAULT_MAX_CHANGED_LINES,
max_delete_ratio=stage.max_delete_ratio,
allowed_paths=stage.allowed_paths,
forbidden_paths=stage.forbidden_paths or DEFAULT_FORBIDDEN_PATHS,
)
except PipelineError as exc:
output_path = self.artifacts.write_stage_output(
task.id,
output_filename,
f"# Patch Validation\n\nStatus: fail\nReason: {exc}\n",
)
return StageResult(
stage.id,
"fail",
str(exc),
output_path=str(output_path.relative_to(self.config.project.root)),
)
output_path = self.artifacts.write_stage_output(
task.id,
output_filename,
format_validation_result(result),
)
return StageResult(
stage.id,
"pass",
"Patch validation passed.",
output_path=str(output_path.relative_to(self.config.project.root)),
)
def _run_patch_apply_stage(
self,
stage: StageConfig,
task: Task,
previous_outputs: dict[str, str],
retry_count: int = 0,
) -> StageResult:
output_filename = _attempt_filename(stage.output or "patch-apply-output.txt", retry_count)
source = _latest_patch_like_output(previous_outputs)
try:
patch = normalize_patch_text(source)
validate_patch(
patch,
self.config.project.root,
self.config.safety,
max_files=stage.max_files or DEFAULT_MAX_FILES,
max_changed_lines=stage.max_lines or DEFAULT_MAX_CHANGED_LINES,
max_delete_ratio=stage.max_delete_ratio,
allowed_paths=stage.allowed_paths,
forbidden_paths=stage.forbidden_paths or DEFAULT_FORBIDDEN_PATHS,
)
except PipelineError as exc:
output_path = self.artifacts.write_stage_output(
task.id,
output_filename,
f"# Patch Apply\n\nStatus: fail\nReason: {exc}\n",
)
return StageResult(
stage.id,
"fail",
str(exc),
output_path=str(output_path.relative_to(self.config.project.root)),
)
applied_path = self.artifacts.write_stage_output(
task.id,
_attempt_filename("applied.patch", retry_count),
patch,
)
write_git_artifacts(self.artifacts, task.id, "before-patch-apply")
mode = stage.mode or "dry_run"
apply_result = apply_patch_with_git(applied_path, self.config.project.root, mode=mode)
write_git_artifacts(self.artifacts, task.id, "after-patch-apply")
output_path = self.artifacts.write_stage_output(
task.id,
output_filename,
format_patch_apply_result(
apply_result,
applied_path.relative_to(self.config.project.root).as_posix(),
),
)
if apply_result.status != "pass":
return StageResult(
stage.id,
"fail",
f"Patch apply failed with code {apply_result.exit_code}.",
output_path=str(output_path.relative_to(self.config.project.root)),
context_update=apply_result.stderr.strip() or apply_result.stdout.strip(),
)
reason = "Patch dry run passed." if mode == "dry_run" else "Patch applied."
return StageResult(
stage.id,
"pass",
reason,
output_path=str(output_path.relative_to(self.config.project.root)),
)
def _latest_task_artifact(self, task_id: str, filename: str) -> Path | None:
path = self.artifacts.create_task_dir(task_id).directory / filename
return path if path.exists() else None
def _stage_for_retry_agent(self, stage: StageConfig, retry_count: int) -> StageConfig:
if not stage.agent_pool:
return stage
index = min(retry_count, len(stage.agent_pool) - 1)
return replace(stage, agent=stage.agent_pool[index])
def _maybe_satisfy_resource_request(
self,
stage: StageConfig,
task: Task,
result: StageResult,
) -> StageResult | None:
requests = parse_resource_requests(self._read_agent_stdout(result.output_path))
if not requests:
return None
paths = satisfy_resource_requests(self.artifacts, task.id, requests)
report_path = self.artifacts.write_stage_output(
task.id,
"resource-requests.md",
format_resource_report(requests, paths, self.config.project.root),
)
return StageResult(
stage.id,
"pass",
"Blocked resource requests were satisfied in the active run directory.",
output_path=str(report_path.relative_to(self.config.project.root)),
context_update=(
"Generated run-local resources: "
+ ", ".join(path.relative_to(self.config.project.root).as_posix() for path in paths)
),
)
def _write_failure_diagnostics(
self,
stage: StageConfig,
task: Task,
result: StageResult,
retry_count: int,
) -> str:
output = self._read_output(result.output_path)
if not output and not result.reason:
return ""
exit_code = _extract_exit_code(output) or _extract_exit_code(result.reason)
modified_files = self._modified_files()
classification = classify_failure(
"\n".join([result.reason, output]),
exit_code=exit_code,
modified_files=modified_files,
)
filename = f"diagnostics/{stage.id}-failure"
if retry_count:
filename += f"-retry-{retry_count}"
filename += ".md"
diagnostic_path = self.artifacts.write_stage_output(
task.id,
filename,
format_failure_classification(
classification,
exit_code=exit_code,
modified_files=modified_files,
),
)
if classification.category == "missing dependency":
dependency_path = self.artifacts.write_stage_output(
task.id,
"diagnostics/dependency-diagnostic.md",
format_dependency_diagnostic(
diagnose_python_dependencies(self.config.project.root, output)
),
)
return (
f"Failure classification: {classification.category}; "
f"diagnostic: {diagnostic_path.relative_to(self.config.project.root).as_posix()}; "
f"dependency diagnostic: {dependency_path.relative_to(self.config.project.root).as_posix()}."
)
return (
f"Failure classification: {classification.category}; "
f"root cause: {classification.probable_root_cause}; "
f"diagnostic: {diagnostic_path.relative_to(self.config.project.root).as_posix()}."
)
def _run_debugger_if_configured(
self,
task: Task,
result: StageResult,
retry_notes: list[str],
) -> str:
debugger_id = next(
(
agent_id
for agent_id, agent in self.config.agents.items()
if agent.role == "debugger" or agent_id == "debugger"
),
None,
)
if debugger_id is None:
return ""
stage = StageConfig(
id="debugger",
type="agent",
agent=debugger_id,
output="debugger.md",
)
output = self._read_output(result.output_path)
context = self.context.read_context(task, retry_notes)
debug_result = self.agent_executor.run_stage(
stage,
task,
{"failed_stage": result.reason, "failure_output": output},
retry_notes,
project_context=context.project_context,
task_context=context.task_context,
retry_context=context.retry_context,
)
return f"Debugger output: {debug_result.output_path or 'none'}."
def _rerun_malformed_review(
self,
stage: StageConfig,
task: Task,
malformed_result: StageResult,
previous_outputs: dict[str, str],
retry_notes: list[str],
retry_count: int,
project_context: str,
task_context: str,
) -> StageResult:
output_name = _attempt_filename(stage.output or f"{stage.id}.md", retry_count + 1)
strict_stage = replace(
self._stage_for_retry_agent(stage, retry_count),
output=output_name,
)
self.logger.event(
"agent.rerun",
"Re-running review after malformed output",
stage_id=stage.id,
task_id=task.id,
)
strict_notes = [
*retry_notes,
"Previous review output was malformed. Return exactly four lines: status, reason, next_stage, context_update. Do not return prose, headings, or analysis.",
]
strict_outputs = _review_previous_outputs(previous_outputs)
malformed_stdout = self._read_agent_stdout(malformed_result.output_path).strip()
strict_outputs["malformed_review_output"] = _compact_previous_output(
malformed_stdout if malformed_stdout else self._read_output(malformed_result.output_path),
max_chars=800,
)
result = self.agent_executor.run_stage(
strict_stage,
task,
strict_outputs,
strict_notes,
project_context=project_context,
task_context=task_context,
retry_context="\n".join(f"- {note}" for note in strict_notes),
)
if _is_malformed_review_result(result):
if stage.id == "style_review" and _previous_continuity_review_passed(previous_outputs):
return StageResult(
result.stage_id,
"pass",
(
"Style review output remained malformed after strict retry; "
"continuing because continuity review passed and deterministic validators already ran."
),
output_path=result.output_path,
context_update="Style review was malformed twice; treated as soft-pass after continuity passed.",
)
return StageResult(
result.stage_id,
"fail",
(
"Review output remained malformed after a strict formatting retry. "
"Stopping without redrafting; inspect the applied draft and review artifact."
),
output_path=result.output_path,
context_update=result.context_update,
)
return result
def _modified_files(self) -> tuple[str, ...]:
completed = subprocess.run(
["git", "status", "--short"],
cwd=self.config.project.root,
capture_output=True,
text=True,
encoding="utf-8",
errors="replace",
)
if completed.returncode != 0:
return ()
files: list[str] = []
for line in completed.stdout.splitlines():
if len(line) > 3:
files.append(line[3:].strip())
return tuple(files)
def _telemetry_entry(
self,
stage: StageConfig,
result: StageResult,
retry_count: int,
) -> TelemetryEntry:
effective_stage = self._stage_for_retry_agent(stage, retry_count)
agent = self.config.agents.get(effective_stage.agent) if effective_stage.agent else None
return telemetry_from_stage_output(
stage_id=result.stage_id,
stage_type=stage.type,
status=result.status,
output=self._read_output(result.output_path),
retry_count=retry_count,
agent_id=agent.id if agent else None,
model=agent.model if agent else None,
)
def _write_telemetry(self, task_id: str, entries: list[TelemetryEntry]) -> None:
summary = format_telemetry_summary(tuple(entries))
self.artifacts.write_stage_output(task_id, "telemetry-summary.md", summary)
self.artifacts.run_dir.joinpath("telemetry-summary.md").write_text(summary, encoding="utf-8")
def _maybe_rerun_agent_with_repo_lookup(
self,
stage: StageConfig,
task: Task,
result: StageResult,
previous_outputs: dict[str, str],
retry_notes: list[str],
project_context: str,
task_context: str,
retry_context: str | None,
) -> StageResult:
if result.status != "pass" or result.output_path is None:
return result
requests = parse_lookup_requests(self._read_agent_stdout(result.output_path))
if not requests:
return result
lookup_context = self.repo_tools.execute_requests(
task.id,
requests,
filename="files-inspected.md",
)
self.logger.event(
"agent.rerun",
"Re-running agent with repo lookup context",
stage_id=stage.id,
task_id=task.id,
lookup_count=len(requests),
)
rerun_outputs = dict(previous_outputs)
rerun_outputs["repo_lookup_results"] = lookup_context
rerun_retry_notes = [
*retry_notes,
"Repository lookup results have been provided. Write the final plan now; do not request more lookups.",
]
rerun_result = self.agent_executor.run_stage(
stage,
task,
rerun_outputs,
rerun_retry_notes,
project_context=project_context,
task_context=task_context,
retry_context="\n".join(f"- {note}" for note in rerun_retry_notes),
)
return StageResult(
stage_id=rerun_result.stage_id,
status=rerun_result.status,
reason=(
"Agent completed after repo lookup."
if rerun_result.status == "pass"
else rerun_result.reason
),
output_path=rerun_result.output_path,
next_stage=rerun_result.next_stage,
context_update=rerun_result.context_update,
)
def _build_context_pack(self, task: Task) -> str:
terms = _task_search_terms(task)
lookup_paths = self.config.safety.scoped_paths or (".",)
files = self._list_context_files(lookup_paths, task.id)
grep_sections: list[str] = []
for term in terms[:5]:
scoped_results = []
for path in lookup_paths:
grep_output = self.repo_tools.grep(re.escape(term), path, max_matches=20).rstrip()
grep_output = _filter_future_task_test_lines(grep_output, task.id)
scoped_results.append(
f"#### Path: {path}\n\n"
"```text\n"
f"{grep_output}\n"
"```"
)
grep_sections.extend(
[
f"### Search: {term}",
"",
*scoped_results,
"",
]
)
return "\n".join(
[
"# Context Pack",
"",
f"Task: `{task.id}`",
f"Title: {task.title}",
"",
"## Acceptance Criteria",
"",
"\n".join(f"- {item}" for item in task.acceptance_criteria) or "- None",
"",
"## Constraints",
"",
f"- Scoped paths: {', '.join(self.config.safety.scoped_paths) or '.'}",
"- Repository lookups are read-only.",
"- Excerpts are line-numbered where files are read directly.",
"",
"## Relevant Files",
"",
"```text",
files,
"```",
"",
"## Search Results",
"",
*grep_sections,
]
)
def _list_context_files(self, paths: tuple[str, ...], task_id: str) -> str:
sections: list[str] = []
for path in paths:
files = self.repo_tools.list_files(path, pattern="*", max_files=80).rstrip()
files = _filter_future_task_test_lines(files, task_id)
sections.extend(
[
f"## Path: {path}",
files,
"",
]
)
return "\n".join(sections).strip() or "No files found."
def _read_output(self, output_path: str | None) -> str:
if output_path is None:
return ""
path = self.config.project.root / Path(output_path)
if not path.exists():
return ""
return path.read_text(encoding="utf-8")
def _read_context_output(self, output_path: str | None) -> str:
stdout = self._read_agent_stdout(output_path)
return stdout if stdout else self._read_output(output_path)
def _read_agent_stdout(self, output_path: str | None) -> str:
if output_path is None:
return ""
path = self.config.project.root / Path(output_path)
json_path = _agent_invocation_json_path(path)
if json_path.exists():
try:
data = json.loads(json_path.read_text(encoding="utf-8"))
except json.JSONDecodeError:
data = {}
stdout = data.get("stdout")
if isinstance(stdout, str):
return stdout
return extract_agent_stdout(self._read_output(output_path))
def _format_retry_note(
self,
retry_count: int,
stage: StageConfig,
result: StageResult,
target_stage: str,
) -> str:
note = (
f"Retry {retry_count}: stage '{stage.id}' returned "
f"{result.status} ({result.reason}); redirecting to '{target_stage}'."
)
if (
target_stage == "update_state"
and "deletion-heavy patch" in result.reason.lower()
):
note = (
f"{note}\n"
"Repair guidance: preserve existing durable state text unless it directly conflicts "
"with the accepted scene. Make minimal additive edits instead of replacing whole "
"sections or compressing character/world files."
)
excerpt = self._failure_excerpt(result.output_path)
if not excerpt:
return note
return f"{note}\n\nRelevant failure output:\n```text\n{excerpt}\n```"
def _failure_excerpt(self, output_path: str | None, max_chars: int = 3500) -> str:
content = self._read_output(output_path)
if not content.strip():
return ""
cleaned_content = re.sub(r"\n{4,}", "\n\n\n", content.strip())
if len(cleaned_content) <= max_chars:
return cleaned_content
patterns = (
"error",
"fail",
"traceback",
"assertionerror",
"exception",
"exit code",
"stderr",
"stdout",
"timed out",
)
lines = content.splitlines()
selected = [
line
for line in lines
if any(pattern in line.lower() for pattern in patterns)
]
excerpt = "\n".join(selected).strip()
if len(excerpt) < 400:
excerpt = content.strip()
excerpt = re.sub(r"\n{4,}", "\n\n\n", excerpt)
if len(excerpt) <= max_chars:
return excerpt
return excerpt[:max_chars].rstrip() + "\n... <truncated>"
def format_summary_stage(
task: Task,
previous_outputs: dict[str, str],
retry_notes: list[str],
) -> str:
outputs = "\n".join(f"- {stage_id}" for stage_id in previous_outputs)
retries = "\n".join(f"- {note}" for note in retry_notes) or "- None"
return "\n".join(
[
"# Final Notes",
"",
f"Task: `{task.id}`",
f"Title: {task.title}",
"",
"## Stage Outputs",
"",
outputs or "- None",
"",
"## Retry Notes",
"",
retries,
"",
]
)
def format_task_completion(task: Task, status: str, changed: bool) -> str:
return "\n".join(
[
"# Task Completion",
"",
f"Task: `{task.id}`",
f"Pipeline status: {status}",
f"Marked complete: {str(changed).lower()}",
"",
]
)
def format_implementation_summary(
stage_id: str,
patch_path: str,
retry_count: int = 0,
retry_notes: list[str] | None = None,
candidate_index_path: str | None = None,
) -> str:
notes = retry_notes or []
lines = [
"# Implementation Summary",
"",
f"Stage: `{stage_id}`",
"Status: pass",
f"Repair attempt: {retry_count}",
f"Patch: `{patch_path}`",
f"Candidate files: `{candidate_index_path}`" if candidate_index_path else "Candidate files: <none>",
"",
"## Retry Feedback",
"",
]
lines.extend(f"- {note}" for note in notes[-5:]) if notes else lines.append("- None")
lines.append("")
return "\n".join(lines)
def _format_writer_context_update(
project_root: Path,
summary_path: Path,
warning_path: Path | None,
) -> str:
summary = summary_path.relative_to(project_root).as_posix()
if warning_path is None:
return f"Implementation summary: {summary}"
warnings = warning_path.relative_to(project_root).as_posix()
return f"Implementation summary: {summary}; writing warnings: {warnings}"
def _latest_patch_like_output(previous_outputs: dict[str, str]) -> str:
for name in ("normalized.patch", "applied.patch", "proposed.patch", "patch_input"):
if name in previous_outputs and previous_outputs[name].strip():
return previous_outputs[name]
for stage_id, content in reversed(list(previous_outputs.items())):
if stage_id.endswith(".patch") or "diff --git " in content or "\n--- " in content:
return content
raise PipelineError("Patch error: no previous patch output found.")
def _writer_patch_filename(stage: StageConfig, retry_count: int) -> str:
if retry_count <= 0:
return stage.output or "proposed.patch"
if stage.type == "code_writer" or stage.id == "implement":
return f"repair-{retry_count}.patch"
return _attempt_filename(stage.output or f"{stage.id}.patch", retry_count)
def _writer_summary_filename(stage: StageConfig, retry_count: int) -> str:
if stage.type == "code_writer" or stage.id == "implement":
return "implementation-summary.md" if retry_count <= 0 else f"repair-summary-{retry_count}.md"
base = f"{stage.id}-summary.md"
return base if retry_count <= 0 else _attempt_filename(base, retry_count)
def _attempt_filename(filename: str, retry_count: int) -> str:
if retry_count <= 0:
return filename
path = Path(filename)
suffix = "".join(path.suffixes)
if suffix:
stem = path.name[: -len(suffix)]
name = f"{stem}-{retry_count}{suffix}"
else:
name = f"{path.name}-{retry_count}"
return path.with_name(name).as_posix()
def _stage_with_attempt_output(stage: StageConfig, retry_count: int) -> StageConfig:
if retry_count <= 0:
return stage
output = _attempt_filename(stage.output or f"{stage.id}-output.txt", retry_count)
return replace(stage, output=output)
def _extract_exit_code(text: str) -> int | None:
match = re.search(r"Exit code:\s*(-?\d+)|code\s+(-?\d+)", text)
if not match:
return None
value = match.group(1) or match.group(2)
try:
return int(value)
except (TypeError, ValueError):
return None
def _task_semantic_results(index, query: str, task_id: str):
current_test_path = _current_task_test_path(task_id)
current = tuple(item for item in index if item.path == current_test_path)
current_paths = {item.path for item in current}
searched = search_index(index, query, limit=8)
filtered = tuple(
item
for item in searched
if item.path not in current_paths and not _future_task_test_path(item.path, current_test_path)
)
return (*current, *filtered)[:8]
def _future_task_test_path(path: str, current_test_path: str) -> bool:
return bool(re.fullmatch(r"tests/test_task\d+\.py", path)) and path != current_test_path
def _current_task_test_path(task_id: str) -> str:
return f"tests/test_{task_id.lower().replace('-', '')}.py"
def _filter_future_task_test_lines(text: str, task_id: str) -> str:
current_test_path = _current_task_test_path(task_id)
kept: list[str] = []
for line in text.splitlines():
normalized = line.replace("\\", "/")
matches = re.findall(r"tests/test_task\d+\.py", normalized)
if matches and all(path != current_test_path for path in matches):
continue
kept.append(line)
return "\n".join(kept)
def _invalid_file_writer_output_summary(output: str, reason: str, max_chars: int = 1200) -> str:
lines = [
f"Reason: {reason}",
f"Output length: {len(output)} characters",
]
lowered = output.lower()
if "```file:" in lowered or "```path:" in lowered:
lines.append("The output started a file block, but NightShift could not parse a complete closed block.")
else:
lines.append("The output did not contain a parseable file block.")
excerpt = output.strip()
if excerpt:
if len(excerpt) > max_chars:
excerpt = excerpt[:max_chars].rstrip() + "\n... <truncated>"
lines.extend(["", "Excerpt:", "```text", excerpt, "```"])
return "\n".join(lines)
def _is_malformed_review_result(result: StageResult) -> bool:
return result.status == "fail" and (
"Review output did not include a valid status" in result.reason
or "Review output remained malformed" in result.reason
)
def _resolve_retry_target_stage(stage: StageConfig, result: StageResult) -> str | None:
if stage.type in {"agent_review", "review"} and _is_malformed_review_result(result):
return None
return (stage.on_status or {}).get(result.status) or stage.on_fail or result.next_stage
def _previous_continuity_review_passed(previous_outputs: dict[str, str]) -> bool:
for name, output in previous_outputs.items():
if "continuity" in name and re.search(r"(?im)^status:\s*pass\s*$", output):
return True
return False
def _review_previous_outputs(previous_outputs: dict[str, str], max_chars: int = 1600) -> dict[str, str]:
compacted: dict[str, str] = {}
priority_names = {
"applied.patch",
"normalized-draft.patch",
"scene-draft.patch",
"draft_scene",
"apply_draft",
"validate_draft",
"test",
"review",
}
for name, output in previous_outputs.items():
if name in priority_names or name.endswith(".patch") or "draft" in name or "apply" in name:
compacted[name] = _compact_previous_output(output, max_chars=max_chars)
continue
if name in {"plan", "semantic_context", "context"}:
compacted[name] = _compact_previous_output(output, max_chars=500)
continue
compacted[name] = _compact_previous_output(output, max_chars=800)
return compacted
def _file_writer_error_reason(stage: StageConfig, reason: str) -> str:
guidance = _file_writer_stage_guidance(stage)
if not guidance or "not allowed for this stage" not in reason:
return reason
return f"{reason} {guidance}"
def _file_writer_stage_guidance(stage: StageConfig) -> str:
allowed = tuple(path.replace("\\", "/").rstrip("/") for path in stage.allowed_paths)
if allowed == ("story/chapters",):
return (
"This is the drafting stage: write only scene prose under `story/chapters/`. "
"Do not update plot state, characters, timeline, unresolved threads, or other story state files."
)
state_paths = {
"story/plot-state.md",
"story/characters.md",
"story/timeline.md",
"story/unresolved-threads.md",
}
if set(allowed).issubset(state_paths) and allowed:
return (
"This is the state update stage: update only durable story state files. "
"Do not rewrite scene prose or chapter files."
)
if allowed:
return "Return file blocks only for the allowed paths configured on this stage."
return ""
def _filter_allowed_file_updates(updates: tuple[FileUpdate, ...], stage: StageConfig) -> tuple[FileUpdate, ...]:
if not updates or not stage.allowed_paths:
return ()
allowed = tuple(path.replace("\\", "/").strip().strip("/") for path in stage.allowed_paths)
kept: list[FileUpdate] = []
for update in updates:
path = update.path.replace("\\", "/").strip().strip("/")
if any(path == root or path.startswith(root.rstrip("/") + "/") for root in allowed):
kept.append(update)
return tuple(kept)
def _file_writer_repair_format_note(stage: StageConfig) -> str:
if _is_state_update_stage(stage):
return (
"Use delimiter file blocks only: FILE: path, ---CONTENT---, complete file content, "
"---END---. Do not use markdown code fences for state update output."
)
return "Use complete fenced file blocks with both the opening ```file:path and closing ``` fence."
def _candidate_artifact_name(path_text: str) -> str:
name = path_text.replace("\\", "/").strip().strip("/")
name = re.sub(r"[^A-Za-z0-9_.-]+", "_", name)
name = name.strip("._-")
return name or "candidate.txt"
def _file_writer_previous_outputs(
previous_outputs: dict[str, str],
retry_count: int,
max_chars: int = 1200,
) -> dict[str, str]:
compacted: dict[str, str] = {}
for name, output in previous_outputs.items():
clean_output = _compact_agent_artifact_output(output)
if retry_count <= 0:
compacted[name] = clean_output
continue
compacted[name] = _compact_previous_output(clean_output, max_chars=max_chars)
return compacted
def _is_state_update_stage(stage: StageConfig) -> bool:
state_paths = {
"story/plot-state.md",
"story/characters.md",
"story/timeline.md",
"story/unresolved-threads.md",
}
allowed = {path.replace("\\", "/").rstrip("/") for path in stage.allowed_paths}
return stage.type == "file_writer" and bool(allowed) and allowed.issubset(state_paths)
def _is_scene_edit_stage(stage: StageConfig) -> bool:
allowed = {path.replace("\\", "/").rstrip("/") for path in stage.allowed_paths}
return stage.type == "file_writer" and stage.id.startswith("edit_") and "story/chapters" in allowed
def _is_writing_file_writer_stage(stage: StageConfig) -> bool:
allowed = {path.replace("\\", "/").rstrip("/") for path in stage.allowed_paths}
writing_paths = {
"story/chapters",
"story/plot-state.md",
"story/characters.md",
"story/timeline.md",
"story/unresolved-threads.md",
}
return stage.type == "file_writer" and bool(allowed & writing_paths)
def _task_story_chapter_paths(task: Task) -> tuple[str, ...]:
paths: list[str] = []
seen: set[str] = set()
for match in re.finditer(r"story/chapters/[^\s`]+?\.md", task.raw_markdown):
path = match.group(0).strip().strip("`")
if path not in seen:
paths.append(path)
seen.add(path)
return tuple(paths)
def _state_update_previous_outputs(previous_outputs: dict[str, str]) -> dict[str, str]:
compacted: dict[str, str] = {}
for name in ("draft_scene", "apply_draft", "continuity_review", "style_review"):
output = previous_outputs.get(name)
if output:
compacted[name] = _compact_previous_output(_compact_agent_artifact_output(output), max_chars=1800)
for name, output in previous_outputs.items():
if name in compacted or name in {"plan", "semantic_context", "context"}:
continue
if "draft" in name or "review" in name or "apply" in name:
compacted[name] = _compact_previous_output(_compact_agent_artifact_output(output), max_chars=1200)
return compacted
def _compact_agent_artifact_output(output: str) -> str:
if "# Agent Output:" not in output or "## Prompt" not in output:
return output
stdout = extract_agent_stdout(output).strip()
return stdout if stdout else output
def _agent_invocation_json_path(output_path: Path) -> Path:
if output_path.suffix:
return output_path.with_suffix(".json")
return output_path.with_name(output_path.name + ".json")
def _compact_previous_output(output: str, max_chars: int = 1200) -> str:
if len(output) <= max_chars:
return output
head_chars = max_chars // 2
tail_chars = max_chars - head_chars
return (
output[:head_chars].rstrip()
+ "\n\n... <previous output truncated for retry prompt> ...\n\n"
+ output[-tail_chars:].lstrip()
)
def _repeated_protected_path_violation(entries: tuple[RetryMemoryEntry, ...]) -> bool:
recent = entries[-2:]
if len(recent) < 2:
return False
return all(_is_protected_path_violation(entry.cause) for entry in recent)
def _is_protected_path_violation(text: str) -> bool:
lowered = text.lower()
return "not allowed for this stage" in lowered and "tests/" in lowered.replace("\\", "/")
def format_aggregate_run_summary(results: list[PipelineResult], status: str, reason: str) -> str:
lines = [
"# Run Summary",
"",
f"Status: {status}",
f"Reason: {reason}",
f"Tasks run: {len(results)}",
f"Completed tasks: {sum(1 for result in results if result.status == 'complete')}",
f"Failed tasks: {sum(1 for result in results if result.status != 'complete')}",
"",
"## Tasks",
"",
]
if not results:
lines.append("- None")
for result in results:
lines.append(
f"- `{result.task_id}`: {result.status} "
f"(retries: {result.retry_count}) - {result.reason}"
)
lines.append("")
return "\n".join(lines)
def format_run_metadata(config: NightShiftConfig) -> str:
lines = [
"# Run Metadata",
"",
f"Project: {config.project.name}",
f"Experiment label: {config.experiment.label or ''}",
f"Prompt variant: {config.experiment.prompt_variant or ''}",
"",
"## Agents",
"",
]
for agent in config.agents.values():
lines.extend(
[
f"### {agent.id}",
"",
f"- Backend: {agent.backend}",
f"- Model: {agent.model or ''}",
f"- Temperature: {agent.temperature if agent.temperature is not None else ''}",
f"- Base URL: {agent.base_url or ''}",
f"- Command: {agent.command or ''}",
f"- System prompt: {agent.system_prompt}",
"",
]
)
return "\n".join(lines)
def _task_search_terms(task: Task) -> list[str]:
source = " ".join([task.id, task.title, *task.acceptance_criteria])
words = re.findall(r"[A-Za-z_][A-Za-z0-9_]{2,}", source)
ignored = {
"the",
"and",
"for",
"with",
"that",
"this",
"task",
"add",
"use",
"can",
"should",
"must",
}
terms: list[str] = []
for word in words:
lowered = word.lower()
if lowered in ignored or lowered in terms:
continue
terms.append(lowered)
return terms or [task.id]