nightshift/tests/test_pipeline.py
K. Hodges e1e6803eb1 Clean up docs, tests, patch writing bug
Checked out commit from rsarv3006 which is super interesting, grabbed some inspiration from it and mentioned it in the ideas file.
2026-05-22 21:04:54 -07:00

1500 lines
66 KiB
Python

from pathlib import Path
from dataclasses import replace
import tempfile
import unittest
from nightshift.artifacts import ArtifactStore
from nightshift.config import (
AgentConfig,
NightShiftConfig,
PipelineConfig,
ProjectConfig,
SafetyConfig,
StageConfig,
)
from nightshift.pipeline import PipelineRunner, _file_writer_previous_outputs
from nightshift.stages import StageResult
from nightshift.tasks import parse_tasks
TASK_MD = """# Tasks
- [ ] TASK-001: Run fake pipeline
Description:
Exercise a fake pipeline.
Acceptance Criteria:
- Happy path completes
- Artifacts are written
"""
def make_config(root: Path, stages: tuple[StageConfig, ...], max_retries: int = 2) -> NightShiftConfig:
return NightShiftConfig(
path=root / "nightshift.yaml",
project=ProjectConfig(
name="test",
root=root,
task_file=Path("tasks.md"),
artifact_dir=Path(".nightshift"),
),
safety=SafetyConfig(
require_clean_worktree=False,
scoped_paths=(".",),
allowed_commands=('python -c "print(\'tests ok\')"',),
forbidden_commands=("rm -rf",),
),
agents={
"planner": AgentConfig(
id="planner",
backend="command",
command='python -c "print(\'plan ok\')"',
system_prompt=Path("planner.md"),
),
"reviewer": AgentConfig(
id="reviewer",
backend="command",
command='python -c "print(\'status: pass\\nreason: ok\')"',
system_prompt=Path("reviewer.md"),
),
"retry_reviewer": AgentConfig(
id="retry_reviewer",
backend="command",
command='python -c "print(\'status: retry\\nreason: retry it\\nnext_stage: implement\')"',
system_prompt=Path("reviewer.md"),
),
},
pipeline=PipelineConfig(max_task_retries=max_retries, stages=stages),
)
class PipelineRunnerTests(unittest.TestCase):
def test_happy_path_pipeline_completes_and_writes_artifacts(self) -> None:
with tempfile.TemporaryDirectory() as directory:
root = Path(directory)
_write_common_files(root)
stages = (
StageConfig(id="plan", type="agent", agent="planner", output="plan.md"),
StageConfig(
id="test",
type="command",
commands=('python -c "print(\'tests ok\')"',),
output="test-output.txt",
),
StageConfig(id="review", type="agent_review", agent="reviewer", output="review.md"),
StageConfig(id="summarize", type="summarize", output="final-notes.md"),
)
config = make_config(root, stages)
runner = PipelineRunner(config, ArtifactStore(root, ".nightshift", run_id="test-run"))
task = parse_tasks(TASK_MD)[0]
result = runner.run_task(task)
self.assertEqual(result.status, "complete")
self.assertEqual(result.retry_count, 0)
self.assertTrue((root / ".nightshift" / "runs" / "test-run" / "tasks" / task.id / "plan.md").exists())
self.assertTrue((root / ".nightshift" / "runs" / "test-run" / "tasks" / task.id / "stage-results.md").exists())
self.assertTrue((root / ".nightshift" / "runs" / "test-run" / "prompts" / "planner.md").exists())
self.assertTrue((root / ".nightshift" / "runs" / "test-run" / "run-metadata.md").exists())
self.assertTrue((root / ".nightshift" / "runs" / "test-run" / "tasks" / task.id / "context.md").exists())
self.assertTrue((root / ".nightshift" / "runs" / "test-run" / "tasks" / task.id / "context-out.md").exists())
self.assertIn(
"## Task Context",
(root / ".nightshift" / "runs" / "test-run" / "tasks" / task.id / "plan.md").read_text(encoding="utf-8"),
)
self.assertIn("Modified Files", (root / ".nightshift" / "runs" / "test-run" / "run-summary.md").read_text(encoding="utf-8"))
def test_on_pass_jumps_to_configured_stage(self) -> None:
with tempfile.TemporaryDirectory() as directory:
root = Path(directory)
_write_common_files(root)
stages = (
StageConfig(id="first", type="agent", agent="planner", output="first.md", on_pass="third"),
StageConfig(
id="second",
type="command",
commands=('python -c "print(\'should not run\')"',),
output="second-output.txt",
),
StageConfig(id="third", type="summarize", output="final-notes.md"),
)
config = make_config(root, stages)
runner = PipelineRunner(config, ArtifactStore(root, ".nightshift", run_id="test-run"))
result = runner.run_task(parse_tasks(TASK_MD)[0])
task_dir = root / ".nightshift" / "runs" / "test-run" / "tasks" / "TASK-001"
self.assertEqual(result.status, "complete")
self.assertEqual([item.stage_id for item in result.stage_results], ["first", "third"])
self.assertFalse((task_dir / "second-output.txt").exists())
def test_task_preflight_fails_when_task_specific_test_file_is_missing(self) -> None:
with tempfile.TemporaryDirectory() as directory:
root = Path(directory)
_write_common_files(root)
stages = (
StageConfig(
id="test",
type="command",
commands=("python -m pytest -q tests/test_{task_id_compact}.py",),
output="test-output.txt",
),
)
config = make_config(root, stages, max_retries=0)
runner = PipelineRunner(config, ArtifactStore(root, ".nightshift", run_id="test-run"))
task = parse_tasks(TASK_MD)[0]
result = runner.run_task(task)
self.assertEqual(result.status, "failed")
self.assertIn("configured task test file is missing", result.reason)
task_dir = root / ".nightshift" / "runs" / "test-run" / "tasks" / task.id
self.assertIn("tests/test_task001.py", (task_dir / "preflight.md").read_text(encoding="utf-8"))
def test_review_can_retry_implementation_until_limit(self) -> None:
with tempfile.TemporaryDirectory() as directory:
root = Path(directory)
_write_common_files(root)
stages = (
StageConfig(id="implement", type="agent", agent="planner", output="implementation-log.md"),
StageConfig(
id="review",
type="agent_review",
agent="retry_reviewer",
on_fail="implement",
output="review.md",
),
)
config = make_config(root, stages, max_retries=2)
runner = PipelineRunner(config, ArtifactStore(root, ".nightshift", run_id="test-run"))
task = parse_tasks(TASK_MD)[0]
result = runner.run_task(task)
self.assertEqual(result.status, "failed")
self.assertEqual(result.retry_count, 2)
self.assertIn("Retry limit reached", result.reason)
self.assertEqual([item.stage_id for item in result.stage_results], ["implement", "review", "implement", "review", "implement", "review"])
def test_failing_review_self_next_stage_routes_to_on_fail(self) -> None:
with tempfile.TemporaryDirectory() as directory:
root = Path(directory)
_write_common_files(root)
config = make_config(root, (), max_retries=1)
config.agents["reviewer"] = AgentConfig(
id="reviewer",
backend="command",
command=(
"python -c \"print('status: fail\\nreason: needs draft repair\\n"
"next_stage: review\\ncontext_update: add concrete details')\""
),
system_prompt=Path("reviewer.md"),
)
config = replace(
config,
pipeline=PipelineConfig(
max_task_retries=1,
stages=(
StageConfig(id="implement", type="agent", agent="planner", output="implementation-log.md"),
StageConfig(
id="review",
type="agent_review",
agent="reviewer",
on_fail="implement",
output="review.md",
),
),
),
)
runner = PipelineRunner(config, ArtifactStore(root, ".nightshift", run_id="test-run"))
task = parse_tasks(TASK_MD)[0]
result = runner.run_task(task)
self.assertEqual(result.retry_count, 1)
self.assertEqual([item.stage_id for item in result.stage_results], ["implement", "review", "implement", "review"])
log = (root / ".nightshift" / "runs" / "test-run" / "run.log").read_text(encoding="utf-8")
self.assertIn("next_stage=implement", log)
def test_malformed_review_gets_strict_retry_without_redrafting(self) -> None:
with tempfile.TemporaryDirectory() as directory:
root = Path(directory)
_write_common_files(root)
(root / "fake_reviewer.py").write_text(
"\n".join(
[
"import sys",
"prompt = sys.stdin.read()",
"if 'Previous review output was malformed' in prompt:",
" print('status: pass')",
" print('reason: strict retry ok')",
" print('next_stage: none')",
" print('context_update: none')",
"else:",
" print('files')",
]
),
encoding="utf-8",
)
stages = (
StageConfig(id="implement", type="agent", agent="planner", output="implementation-log.md"),
StageConfig(
id="review",
type="agent_review",
agent="reviewer",
on_fail="implement",
output="review.md",
),
StageConfig(id="summarize", type="summarize", output="final-notes.md"),
)
config = make_config(root, stages, max_retries=2)
config.agents["reviewer"] = AgentConfig(
id="reviewer",
backend="command",
command="python fake_reviewer.py",
system_prompt=Path("reviewer.md"),
)
runner = PipelineRunner(config, ArtifactStore(root, ".nightshift", run_id="test-run"))
task = parse_tasks(TASK_MD)[0]
result = runner.run_task(task)
task_dir = root / ".nightshift" / "runs" / "test-run" / "tasks" / task.id
self.assertEqual(result.status, "complete")
self.assertEqual(result.retry_count, 0)
self.assertEqual([item.stage_id for item in result.stage_results], ["implement", "review", "summarize"])
self.assertTrue((task_dir / "review.md").exists())
self.assertTrue((task_dir / "review-1.md").exists())
self.assertIn("files", (task_dir / "review.md").read_text(encoding="utf-8"))
self.assertIn("strict retry ok", (task_dir / "review-1.md").read_text(encoding="utf-8"))
def test_malformed_review_retry_uses_stdout_summary_not_full_prompt_artifact(self) -> None:
with tempfile.TemporaryDirectory() as directory:
root = Path(directory)
_write_common_files(root)
(root / "fake_reviewer.py").write_text(
"\n".join(
[
"import sys",
"prompt = sys.stdin.read()",
"if 'Previous review output was malformed' in prompt:",
" open('retry-prompt.txt', 'w', encoding='utf-8').write(prompt)",
" print('status: pass')",
" print('reason: strict retry ok')",
" print('next_stage:')",
" print('context_update:')",
"else:",
" print('No extra text. No JSON.')",
]
),
encoding="utf-8",
)
stages = (
StageConfig(id="implement", type="agent", agent="planner", output="implementation-log.md"),
StageConfig(id="review", type="agent_review", agent="reviewer", output="review.md"),
)
config = make_config(root, stages, max_retries=1)
config.agents["reviewer"] = AgentConfig(
id="reviewer",
backend="command",
command="python fake_reviewer.py",
system_prompt=Path("reviewer.md"),
)
runner = PipelineRunner(config, ArtifactStore(root, ".nightshift", run_id="test-run"))
result = runner.run_task(parse_tasks(TASK_MD)[0])
retry_prompt = (root / "retry-prompt.txt").read_text(encoding="utf-8")
self.assertEqual(result.status, "complete")
self.assertIn("malformed_review_output", retry_prompt)
self.assertIn("No extra text. No JSON.", retry_prompt)
self.assertNotIn("## Prompt", retry_prompt)
def test_malformed_review_stops_without_on_fail_redraft(self) -> None:
with tempfile.TemporaryDirectory() as directory:
root = Path(directory)
_write_common_files(root)
(root / "fake_reviewer.py").write_text("print('files')\n", encoding="utf-8")
stages = (
StageConfig(id="implement", type="agent", agent="planner", output="implementation-log.md"),
StageConfig(
id="review",
type="agent_review",
agent="reviewer",
on_fail="implement",
output="review.md",
),
)
config = make_config(root, stages, max_retries=2)
config.agents["reviewer"] = AgentConfig(
id="reviewer",
backend="command",
command="python fake_reviewer.py",
system_prompt=Path("reviewer.md"),
)
runner = PipelineRunner(config, ArtifactStore(root, ".nightshift", run_id="test-run"))
task = parse_tasks(TASK_MD)[0]
result = runner.run_task(task)
task_dir = root / ".nightshift" / "runs" / "test-run" / "tasks" / task.id
self.assertEqual(result.status, "failed")
self.assertEqual(result.retry_count, 0)
self.assertIn("remained malformed", result.reason)
self.assertEqual([item.stage_id for item in result.stage_results], ["implement", "review"])
self.assertTrue((task_dir / "review.md").exists())
self.assertTrue((task_dir / "review-1.md").exists())
def test_malformed_style_review_soft_passes_after_continuity_pass(self) -> None:
with tempfile.TemporaryDirectory() as directory:
root = Path(directory)
_write_common_files(root)
(root / "fake_style.py").write_text("print('No extra text. No JSON.')\n", encoding="utf-8")
stages = (
StageConfig(id="continuity_review", type="agent_review", agent="reviewer", output="continuity-review.md"),
StageConfig(id="style_review", type="agent_review", agent="style", output="style-review.md"),
StageConfig(id="summarize", type="summarize", output="final-notes.md"),
)
config = make_config(root, stages, max_retries=1)
config.agents["style"] = AgentConfig(
id="style",
backend="command",
command="python fake_style.py",
system_prompt=Path("reviewer.md"),
)
runner = PipelineRunner(config, ArtifactStore(root, ".nightshift", run_id="test-run"))
result = runner.run_task(parse_tasks(TASK_MD)[0])
self.assertEqual(result.status, "complete")
self.assertIn("Style review output remained malformed", result.stage_results[1].reason)
self.assertEqual([item.stage_id for item in result.stage_results], ["continuity_review", "style_review", "summarize"])
def test_passing_review_next_stage_is_ignored(self) -> None:
with tempfile.TemporaryDirectory() as directory:
root = Path(directory)
_write_common_files(root)
config = make_config(root, (), max_retries=0)
reviewer = replace(
config.agents["reviewer"],
command='python -c "print(\'status: pass\\nreason: ok\\nnext_stage: TASK-002\')"',
)
config = replace(
config,
agents={**config.agents, "reviewer": reviewer},
pipeline=PipelineConfig(
max_task_retries=0,
stages=(
StageConfig(id="review", type="agent_review", agent="reviewer", output="review.md"),
StageConfig(id="summarize", type="summarize", output="final-notes.md"),
),
),
)
runner = PipelineRunner(config, ArtifactStore(root, ".nightshift", run_id="test-run"))
task = parse_tasks(TASK_MD)[0]
result = runner.run_task(task)
self.assertEqual(result.status, "complete")
self.assertEqual([item.stage_id for item in result.stage_results], ["review", "summarize"])
log = (root / ".nightshift" / "runs" / "test-run" / "run.log").read_text(encoding="utf-8")
self.assertIn("stage.next_ignored", log)
def test_stage_error_is_reported_as_failed_result(self) -> None:
with tempfile.TemporaryDirectory() as directory:
root = Path(directory)
_write_common_files(root)
stages = (
StageConfig(id="plan", type="agent", agent="planner", output="../bad.md"),
)
config = make_config(root, stages)
runner = PipelineRunner(config, ArtifactStore(root, ".nightshift", run_id="test-run"))
task = parse_tasks(TASK_MD)[0]
result = runner.run_task(task)
self.assertEqual(result.status, "failed")
self.assertEqual(result.stage_results[0].status, "fail")
self.assertTrue(
(root / ".nightshift" / "runs" / "test-run" / "tasks" / task.id / "final-notes.md").exists()
)
def test_successful_task_is_marked_complete_and_git_artifacts_exist(self) -> None:
with tempfile.TemporaryDirectory() as directory:
root = Path(directory)
_write_common_files(root)
stages = (
StageConfig(id="plan", type="agent", agent="planner", output="plan.md"),
)
config = make_config(root, stages)
runner = PipelineRunner(config, ArtifactStore(root, ".nightshift", run_id="test-run"))
task = parse_tasks(TASK_MD)[0]
result = runner.run_task(task)
self.assertEqual(result.status, "complete")
self.assertIn("- [x] TASK-001", (root / "tasks.md").read_text(encoding="utf-8"))
task_dir = root / ".nightshift" / "runs" / "test-run" / "tasks" / task.id
self.assertTrue((task_dir / "task-completion.md").exists())
self.assertTrue((task_dir / "git-status-before.txt").exists())
self.assertTrue((task_dir / "git-status-after.txt").exists())
self.assertTrue((task_dir / "diff.patch").exists())
def test_multi_task_run_writes_aggregate_summary_and_stops_on_failure(self) -> None:
with tempfile.TemporaryDirectory() as directory:
root = Path(directory)
_write_common_files(root)
tasks_md = TASK_MD + """
- [ ] TASK-002: Second task
Description:
Should not run after failure.
Acceptance Criteria:
- skipped
"""
(root / "tasks.md").write_text(tasks_md, encoding="utf-8")
stages = (
StageConfig(
id="test",
type="command",
commands=('python -c "print(\'missing\')"',),
output="../bad.txt",
),
)
config = make_config(root, stages, max_retries=0)
runner = PipelineRunner(config, ArtifactStore(root, ".nightshift", run_id="test-run"))
tasks = parse_tasks(tasks_md)
result = runner.run_tasks(tasks)
self.assertEqual(result.status, "failed")
self.assertEqual(len(result.task_results), 1)
summary = (root / ".nightshift" / "runs" / "test-run" / "run-summary.md").read_text(encoding="utf-8")
self.assertIn("Tasks run: 1", summary)
def test_multi_task_run_blocks_incomplete_dependency(self) -> None:
with tempfile.TemporaryDirectory() as directory:
root = Path(directory)
_write_common_files(root)
tasks_md = """# Tasks
- [ ] TASK-001: Blocked
Dependencies:
- TASK-002
Acceptance Criteria:
- blocked
- [ ] TASK-002: Later
Acceptance Criteria:
- later
"""
(root / "tasks.md").write_text(tasks_md, encoding="utf-8")
config = make_config(root, (), max_retries=0)
runner = PipelineRunner(config, ArtifactStore(root, ".nightshift", run_id="test-run"))
result = runner.run_tasks(parse_tasks(tasks_md))
self.assertEqual(result.status, "failed")
self.assertEqual(result.task_results[0].status, "blocked")
def test_run_writes_operational_log(self) -> None:
with tempfile.TemporaryDirectory() as directory:
root = Path(directory)
_write_common_files(root)
stages = (StageConfig(id="plan", type="agent", agent="planner", output="plan.md"),)
artifacts = ArtifactStore(root, ".nightshift", run_id="test-run")
config = make_config(root, stages)
runner = PipelineRunner(config, artifacts)
task = parse_tasks(TASK_MD)[0]
artifacts.initialize_run()
artifacts.run_log_path.write_text("old run log\n", encoding="utf-8")
runner.run_task(task)
log = (root / ".nightshift" / "runs" / "test-run" / "run.log").read_text(encoding="utf-8")
self.assertNotIn("old run log", log)
self.assertIn("task.start", log)
self.assertIn("stage.start", log)
self.assertIn("agent.finish", log)
def test_planner_lookup_requests_write_files_inspected_and_rerun(self) -> None:
with tempfile.TemporaryDirectory() as directory:
root = Path(directory)
_write_common_files(root)
(root / "target.py").write_text("VALUE = 1\n", encoding="utf-8")
(root / "fake_planner.py").write_text(
"\n".join(
[
"import sys",
"prompt = sys.stdin.read()",
"if 'repo_lookup_results' in prompt:",
" print('final plan with context')",
"else:",
" print('lookup_requests:')",
" print('- tool: read_file')",
" print(' path: target.py')",
]
),
encoding="utf-8",
)
stages = (StageConfig(id="plan", type="agent", agent="planner", output="plan.md"),)
config = make_config(root, stages)
config.agents["planner"] = AgentConfig(
id="planner",
backend="command",
command="python fake_planner.py",
system_prompt=Path("planner.md"),
)
runner = PipelineRunner(config, ArtifactStore(root, ".nightshift", run_id="test-run"))
task = parse_tasks(TASK_MD)[0]
result = runner.run_task(task)
task_dir = root / ".nightshift" / "runs" / "test-run" / "tasks" / task.id
self.assertEqual(result.status, "complete")
self.assertTrue((task_dir / "files-inspected.md").exists())
self.assertIn("1: VALUE = 1", (task_dir / "files-inspected.md").read_text(encoding="utf-8"))
self.assertIn("final plan with context", (task_dir / "plan.md").read_text(encoding="utf-8"))
def test_repo_context_stage_writes_context_pack(self) -> None:
with tempfile.TemporaryDirectory() as directory:
root = Path(directory)
_write_common_files(root)
(root / "app.py").write_text("def run_pipeline():\n return True\n", encoding="utf-8")
stages = (StageConfig(id="context", type="repo_context", output="context-pack.md"),)
config = make_config(root, stages)
runner = PipelineRunner(config, ArtifactStore(root, ".nightshift", run_id="test-run"))
task = parse_tasks(TASK_MD)[0]
result = runner.run_task(task)
pack = root / ".nightshift" / "runs" / "test-run" / "tasks" / task.id / "context-pack.md"
self.assertEqual(result.status, "complete")
self.assertIn("Context Pack", pack.read_text(encoding="utf-8"))
self.assertIn("app.py", pack.read_text(encoding="utf-8"))
def test_repo_context_stage_respects_scoped_paths_without_project_root(self) -> None:
with tempfile.TemporaryDirectory() as directory:
root = Path(directory)
_write_common_files(root)
(root / "src").mkdir()
(root / "tests").mkdir()
(root / "src" / "app.py").write_text("def create_snippet():\n return True\n", encoding="utf-8")
(root / "tests" / "test_app.py").write_text("def test_create_snippet():\n assert True\n", encoding="utf-8")
stages = (StageConfig(id="context", type="repo_context", output="context-pack.md"),)
config = make_config(root, stages)
config = replace(
config,
safety=SafetyConfig(
require_clean_worktree=False,
scoped_paths=("src", "tests", "pyproject.toml", "README.md"),
allowed_commands=config.safety.allowed_commands,
forbidden_commands=config.safety.forbidden_commands,
),
)
(root / "pyproject.toml").write_text("[project]\nname = 'demo'\n", encoding="utf-8")
(root / "README.md").write_text("# Demo\n", encoding="utf-8")
runner = PipelineRunner(config, ArtifactStore(root, ".nightshift", run_id="test-run"))
task = parse_tasks(TASK_MD)[0]
result = runner.run_task(task)
pack = root / ".nightshift" / "runs" / "test-run" / "tasks" / task.id / "context-pack.md"
self.assertEqual(result.status, "complete")
content = pack.read_text(encoding="utf-8")
self.assertIn("src/app.py", content)
self.assertIn("tests/test_app.py", content)
def test_project_context_chart_is_written_during_run(self) -> None:
with tempfile.TemporaryDirectory() as directory:
root = Path(directory)
_write_common_files(root)
(root / "cli.py").write_text(
"def main():\n return 0\n\nif __name__ == \"__main__\":\n main()\n",
encoding="utf-8",
)
stages = (StageConfig(id="plan", type="agent", agent="planner", output="plan.md"),)
config = make_config(root, stages)
runner = PipelineRunner(config, ArtifactStore(root, ".nightshift", run_id="test-run"))
task = parse_tasks(TASK_MD)[0]
runner.run_task(task)
chart = root / ".nightshift" / "project-context-chart.md"
self.assertTrue(chart.exists())
content = chart.read_text(encoding="utf-8")
self.assertIn("cli.py", content)
self.assertIn("main@L1", content)
def test_retry_note_keeps_small_failure_output_unfiltered(self) -> None:
with tempfile.TemporaryDirectory() as directory:
root = Path(directory)
_write_common_files(root)
artifacts = ArtifactStore(root, ".nightshift", run_id="test-run")
config = make_config(root, ())
runner = PipelineRunner(config, artifacts)
output_path = artifacts.write_stage_output(
"TASK-001",
"test-output.txt",
"\n".join(
[
"# Command Output: test",
"",
"### stdout",
"",
"```text",
"def test_board_route(self):",
" response = self.client.get('/board/general')",
" self.assertEqual(response.status_code, 200)",
"E AssertionError: 404 != 200",
"```",
"",
]
),
)
relative_output = str(output_path.relative_to(root))
note = runner._format_retry_note(
1,
StageConfig(id="test", type="command", on_fail="write"),
StageResult(
stage_id="test",
status="fail",
reason="Command exited with code 1: python -m pytest -q",
output_path=relative_output,
),
"write",
)
self.assertIn("response = self.client.get('/board/general')", note)
self.assertIn("self.assertEqual(response.status_code, 200)", note)
def test_state_update_retry_note_guides_deletion_heavy_repairs(self) -> None:
with tempfile.TemporaryDirectory() as directory:
root = Path(directory)
_write_common_files(root)
artifacts = ArtifactStore(root, ".nightshift", run_id="test-run")
config = make_config(root, ())
runner = PipelineRunner(config, artifacts)
output_path = artifacts.write_stage_output(
"TASK-001",
"state-validation.md",
"# Patch Validation\n\nStatus: fail\nReason: Patch validation failed: deletion-heavy patch exceeds max_delete_ratio 0.35.\n",
)
note = runner._format_retry_note(
1,
StageConfig(id="validate_state", type="patch_validator", on_fail="update_state"),
StageResult(
stage_id="validate_state",
status="fail",
reason="Patch validation failed: deletion-heavy patch exceeds max_delete_ratio 0.35.",
output_path=str(output_path.relative_to(root)),
),
"update_state",
)
self.assertIn("preserve existing durable state text", note)
self.assertIn("minimal additive edits", note)
def test_code_writer_normalizer_and_validator_pipeline(self) -> None:
with tempfile.TemporaryDirectory() as directory:
root = Path(directory)
_write_common_files(root)
(root / "app.py").write_text("old\n", encoding="utf-8")
(root / "fake_writer.py").write_text(
"\n".join(
[
"print('```diff')",
"print('diff --git a/app.py b/app.py')",
"print('--- a/app.py')",
"print('+++ b/app.py')",
"print('@@ -1 +1 @@')",
"print('-old')",
"print('+new')",
"print('```')",
]
),
encoding="utf-8",
)
stages = (
StageConfig(id="context", type="repo_context", output="context-pack.md"),
StageConfig(id="write", type="code_writer", agent="writer"),
StageConfig(id="normalize", type="patch_normalizer"),
StageConfig(id="validate", type="patch_validator"),
)
config = make_config(root, stages)
config.agents["writer"] = AgentConfig(
id="writer",
backend="command",
command="python fake_writer.py",
system_prompt=Path("planner.md"),
)
runner = PipelineRunner(config, ArtifactStore(root, ".nightshift", run_id="test-run"))
task = parse_tasks(TASK_MD)[0]
result = runner.run_task(task)
task_dir = root / ".nightshift" / "runs" / "test-run" / "tasks" / task.id
self.assertEqual(result.status, "complete")
self.assertTrue((task_dir / "proposed.patch").exists())
self.assertTrue((task_dir / "implementation-summary.md").exists())
self.assertTrue((task_dir / "normalized.patch").exists())
self.assertIn("Status: pass", (task_dir / "patch-validation.md").read_text(encoding="utf-8"))
def test_code_writer_lookup_requests_are_rerun_with_context(self) -> None:
with tempfile.TemporaryDirectory() as directory:
root = Path(directory)
_write_common_files(root)
(root / "app.py").write_text("old\n", encoding="utf-8")
(root / "fake_writer.py").write_text(
"\n".join(
[
"import sys",
"prompt = sys.stdin.read()",
"if 'repo_lookup_results' in prompt:",
" print('diff --git a/app.py b/app.py')",
" print('--- a/app.py')",
" print('+++ b/app.py')",
" print('@@ -1 +1 @@')",
" print('-old')",
" print('+new')",
"else:",
" print('lookup_requests:')",
" print('- tool: read_file')",
" print(' path: app.py')",
]
),
encoding="utf-8",
)
stages = (
StageConfig(id="write", type="code_writer", agent="writer"),
StageConfig(id="normalize", type="patch_normalizer"),
StageConfig(id="validate", type="patch_validator"),
)
config = make_config(root, stages)
config.agents["writer"] = AgentConfig(
id="writer",
backend="command",
command="python fake_writer.py",
system_prompt=Path("planner.md"),
)
runner = PipelineRunner(config, ArtifactStore(root, ".nightshift", run_id="test-run"))
result = runner.run_task(parse_tasks(TASK_MD)[0])
task_dir = root / ".nightshift" / "runs" / "test-run" / "tasks" / "TASK-001"
self.assertEqual(result.status, "complete")
self.assertTrue((task_dir / "implementation-files-inspected.md").exists())
self.assertIn("diff --git a/app.py b/app.py", (task_dir / "proposed.patch").read_text(encoding="utf-8"))
def test_file_writer_generates_patch_from_file_blocks(self) -> None:
with tempfile.TemporaryDirectory() as directory:
root = Path(directory)
_write_common_files(root)
(root / "app.py").write_text("old\n", encoding="utf-8")
(root / "fake_writer.py").write_text(
"\n".join(
[
"print('```file:app.py')",
"print('new')",
"print('```')",
"print('```file:tests/test_app.py')",
"print('def test_app():')",
"print(' assert True')",
"print('```')",
]
),
encoding="utf-8",
)
stages = (
StageConfig(id="write", type="file_writer", agent="writer"),
StageConfig(id="normalize", type="patch_normalizer"),
StageConfig(id="validate", type="patch_validator"),
)
config = make_config(root, stages)
config.agents["writer"] = AgentConfig(
id="writer",
backend="command",
command="python fake_writer.py",
system_prompt=Path("planner.md"),
)
runner = PipelineRunner(config, ArtifactStore(root, ".nightshift", run_id="test-run"))
result = runner.run_task(parse_tasks(TASK_MD)[0])
patch = root / ".nightshift" / "runs" / "test-run" / "tasks" / "TASK-001" / "proposed.patch"
agent_output = root / ".nightshift" / "runs" / "test-run" / "tasks" / "TASK-001" / "write-agent-output.md"
self.assertEqual(result.status, "complete")
self.assertTrue(agent_output.exists())
self.assertIn("diff --git a/app.py b/app.py", patch.read_text(encoding="utf-8"))
self.assertIn("diff --git a/tests/test_app.py b/tests/test_app.py", patch.read_text(encoding="utf-8"))
candidate_index = root / ".nightshift" / "runs" / "test-run" / "tasks" / "TASK-001" / "candidate-files" / "write" / "index.md"
self.assertTrue(candidate_index.exists())
self.assertIn("app.py", candidate_index.read_text(encoding="utf-8"))
def test_file_writer_ignores_disallowed_blocks_when_allowed_candidate_exists(self) -> None:
with tempfile.TemporaryDirectory() as directory:
root = Path(directory)
_write_common_files(root)
(root / "story" / "chapters").mkdir(parents=True)
(root / "fake_writer.py").write_text(
"\n".join(
[
"print('```file:story/chapters/scene.md')",
"print('scene prose')",
"print('```')",
"print('```file:story/plot-state.md')",
"print('state')",
"print('```')",
]
),
encoding="utf-8",
)
stages = (
StageConfig(
id="draft_scene",
type="file_writer",
agent="writer",
allowed_paths=("story/chapters",),
),
)
config = make_config(root, stages, max_retries=0)
config.agents["writer"] = AgentConfig(
id="writer",
backend="command",
command="python fake_writer.py",
system_prompt=Path("planner.md"),
)
runner = PipelineRunner(config, ArtifactStore(root, ".nightshift", run_id="test-run"))
result = runner.run_task(parse_tasks(TASK_MD)[0])
task_dir = root / ".nightshift" / "runs" / "test-run" / "tasks" / "TASK-001"
candidate = task_dir / "candidate-files" / "draft_scene" / "001-story_chapters_scene.md"
rejected_candidate = task_dir / "candidate-files" / "draft_scene" / "002-story_plot-state.md"
patch = task_dir / "proposed.patch"
self.assertEqual(result.status, "complete")
self.assertTrue(patch.exists())
self.assertIn("story/chapters/scene.md", patch.read_text(encoding="utf-8"))
self.assertNotIn("story/plot-state.md", patch.read_text(encoding="utf-8"))
self.assertTrue(candidate.exists())
self.assertTrue(rejected_candidate.exists())
self.assertEqual(candidate.read_text(encoding="utf-8"), "scene prose\n")
def test_file_writer_accepts_unified_diff_fallback(self) -> None:
with tempfile.TemporaryDirectory() as directory:
root = Path(directory)
_write_common_files(root)
(root / "app.py").write_text("old\n", encoding="utf-8")
(root / "fake_writer.py").write_text(
"\n".join(
[
"print('diff --git a/app.py b/app.py')",
"print('--- a/app.py')",
"print('+++ b/app.py')",
"print('@@ -1 +1,4 @@')",
"print('-old')",
"print('+new')",
]
),
encoding="utf-8",
)
stages = (
StageConfig(id="write", type="file_writer", agent="writer"),
StageConfig(id="validate", type="patch_validator"),
)
config = make_config(root, stages)
config.agents["writer"] = AgentConfig(
id="writer",
backend="command",
command="python fake_writer.py",
system_prompt=Path("planner.md"),
)
runner = PipelineRunner(config, ArtifactStore(root, ".nightshift", run_id="test-run"))
result = runner.run_task(parse_tasks(TASK_MD)[0])
patch = root / ".nightshift" / "runs" / "test-run" / "tasks" / "TASK-001" / "proposed.patch"
self.assertEqual(result.status, "complete")
self.assertIn("@@ -1 +1 @@", patch.read_text(encoding="utf-8"))
def test_file_writer_no_changes_skips_patch_stages_and_runs_tests(self) -> None:
with tempfile.TemporaryDirectory() as directory:
root = Path(directory)
_write_common_files(root)
(root / "app.py").write_text("new\n", encoding="utf-8")
(root / "fake_writer.py").write_text(
"\n".join(
[
"print('```file:app.py')",
"print('new')",
"print('```')",
]
),
encoding="utf-8",
)
test_command = 'python -c "from pathlib import Path; raise SystemExit(0 if Path(\'app.py\').read_text() == \'new\\n\' else 1)"'
stages = (
StageConfig(id="write", type="file_writer", agent="writer"),
StageConfig(id="normalize", type="patch_normalizer"),
StageConfig(id="validate", type="patch_validator"),
StageConfig(id="apply", type="patch_apply", mode="apply"),
StageConfig(id="test", type="command", commands=(test_command,), output="test-output.txt"),
)
config = make_config(root, stages)
config = replace(
config,
safety=SafetyConfig(
require_clean_worktree=False,
scoped_paths=(".",),
allowed_commands=(test_command,),
forbidden_commands=("rm -rf",),
),
)
config.agents["writer"] = AgentConfig(
id="writer",
backend="command",
command="python fake_writer.py",
system_prompt=Path("planner.md"),
)
runner = PipelineRunner(config, ArtifactStore(root, ".nightshift", run_id="test-run"))
result = runner.run_task(parse_tasks(TASK_MD)[0])
task_dir = root / ".nightshift" / "runs" / "test-run" / "tasks" / "TASK-001"
self.assertEqual(result.status, "complete")
self.assertTrue((task_dir / "test-output.txt").exists())
self.assertFalse((task_dir / "normalized.patch").exists())
self.assertFalse((task_dir / "patch-validation.md").exists())
def test_file_writer_invalid_output_gets_strict_rerun(self) -> None:
with tempfile.TemporaryDirectory() as directory:
root = Path(directory)
_write_common_files(root)
(root / "app.py").write_text("old\n", encoding="utf-8")
(root / "fake_writer.py").write_text(
"\n".join(
[
"import sys",
"prompt = sys.stdin.read()",
"if 'Previous file_writer output was invalid' not in prompt:",
" print('lookup failed')",
"else:",
" print('```file:app.py')",
" print('new')",
" print('```')",
]
),
encoding="utf-8",
)
stages = (
StageConfig(id="write", type="file_writer", agent="writer"),
StageConfig(id="validate", type="patch_validator"),
)
config = make_config(root, stages)
config.agents["writer"] = AgentConfig(
id="writer",
backend="command",
command="python fake_writer.py",
system_prompt=Path("planner.md"),
)
runner = PipelineRunner(config, ArtifactStore(root, ".nightshift", run_id="test-run"))
result = runner.run_task(parse_tasks(TASK_MD)[0])
patch = root / ".nightshift" / "runs" / "test-run" / "tasks" / "TASK-001" / "proposed.patch"
self.assertEqual(result.status, "complete")
self.assertIn("+new", patch.read_text(encoding="utf-8"))
def test_file_writer_invalid_output_retry_uses_compact_summary(self) -> None:
with tempfile.TemporaryDirectory() as directory:
root = Path(directory)
_write_common_files(root)
(root / "app.py").write_text("old\n", encoding="utf-8")
(root / "fake_writer.py").write_text(
"\n".join(
[
"import sys",
"prompt = sys.stdin.read()",
"if 'Previous file_writer output was invalid' not in prompt:",
" print('```file:app.py')",
" print('x' * 5000)",
"else:",
" (open('retry-prompt.txt', 'w', encoding='utf-8').write(prompt))",
" print('```file:app.py')",
" print('new')",
" print('```')",
]
),
encoding="utf-8",
)
stages = (
StageConfig(id="write", type="file_writer", agent="writer"),
StageConfig(id="validate", type="patch_validator"),
)
config = make_config(root, stages)
config.agents["writer"] = AgentConfig(
id="writer",
backend="command",
command="python fake_writer.py",
system_prompt=Path("planner.md"),
)
runner = PipelineRunner(config, ArtifactStore(root, ".nightshift", run_id="test-run"))
result = runner.run_task(parse_tasks(TASK_MD)[0])
retry_prompt = (root / "retry-prompt.txt").read_text(encoding="utf-8")
self.assertEqual(result.status, "complete")
self.assertIn("invalid_file_writer_output_summary", retry_prompt)
self.assertIn("... <truncated>", retry_prompt)
self.assertLess(len(retry_prompt), 9000)
def test_state_file_writer_invalid_output_retry_uses_delimiter_format(self) -> None:
with tempfile.TemporaryDirectory() as directory:
root = Path(directory)
_write_common_files(root)
story = root / "story"
story.mkdir()
(story / "plot-state.md").write_text("old\n", encoding="utf-8")
(root / "fake_writer.py").write_text(
"\n".join(
[
"import sys",
"prompt = sys.stdin.read()",
"if 'Previous file_writer output was invalid' not in prompt:",
" print('lookup failed')",
"else:",
" (open('retry-prompt.txt', 'w', encoding='utf-8').write(prompt))",
" print('FILE: story/plot-state.md')",
" print('---CONTENT---')",
" print('old')",
" print('new')",
" print('---END---')",
]
),
encoding="utf-8",
)
stages = (
StageConfig(
id="update_state",
type="file_writer",
agent="writer",
allowed_paths=(
"story/plot-state.md",
"story/characters.md",
"story/timeline.md",
"story/unresolved-threads.md",
),
),
)
config = make_config(root, stages)
config.agents["writer"] = AgentConfig(
id="writer",
backend="command",
command="python fake_writer.py",
system_prompt=Path("planner.md"),
)
runner = PipelineRunner(config, ArtifactStore(root, ".nightshift", run_id="test-run"))
result = runner.run_task(parse_tasks(TASK_MD)[0])
retry_prompt = (root / "retry-prompt.txt").read_text(encoding="utf-8")
self.assertEqual(result.status, "complete")
self.assertIn("Use delimiter file blocks only", retry_prompt)
self.assertNotIn("Use complete fenced file blocks", retry_prompt)
def test_file_writer_retry_compacts_large_previous_outputs(self) -> None:
outputs = {
"scene-draft.patch": "a" * 5000,
"draft-validation.md": "Patch validation failed",
}
compacted = _file_writer_previous_outputs(outputs, retry_count=1, max_chars=100)
self.assertIn("previous output truncated", compacted["scene-draft.patch"])
self.assertLess(len(compacted["scene-draft.patch"]), 180)
self.assertEqual(compacted["draft-validation.md"], "Patch validation failed")
def test_file_writer_first_attempt_preserves_large_previous_outputs(self) -> None:
outputs = {"plan": "a" * 5000}
compacted = _file_writer_previous_outputs(outputs, retry_count=0, max_chars=100)
self.assertEqual(compacted["plan"], "a" * 5000)
def test_file_writer_previous_outputs_strip_wrapped_agent_prompts(self) -> None:
output = "\n".join(
[
"# Agent Output: plan",
"",
"## stdout",
"",
"```text",
"useful plan",
"```",
"",
"## stderr",
"",
"```text",
"```",
"",
"## Prompt",
"",
"```markdown",
"huge prompt marker",
"```",
]
)
compacted = _file_writer_previous_outputs({"plan": output}, retry_count=0)
self.assertEqual(compacted["plan"], "useful plan")
self.assertNotIn("huge prompt marker", compacted["plan"])
def test_state_update_file_writer_gets_focused_context_and_current_files(self) -> None:
with tempfile.TemporaryDirectory() as directory:
root = Path(directory)
_write_common_files(root)
(root / "story").mkdir()
(root / "story" / "plot-state.md").write_text("# Plot State\n\n- Before\n", encoding="utf-8")
(root / "fake_state_writer.py").write_text(
"\n".join(
[
"import sys",
"prompt = sys.stdin.read()",
"open('state-prompt.txt', 'w', encoding='utf-8').write(prompt)",
"if 'current_allowed_files' in prompt and 'huge-plan-marker' not in prompt:",
" print('FILE: story/plot-state.md')",
" print('---CONTENT---')",
" print('# Plot State')",
" print()",
" print('- Before')",
" print('- After')",
" print('---END---')",
"else:",
" print('')",
]
),
encoding="utf-8",
)
config = make_config(
root,
(
StageConfig(id="plan", type="agent", agent="planner", output="plan.md"),
StageConfig(
id="update_state",
type="file_writer",
agent="state_updater",
allowed_paths=("story/plot-state.md",),
),
),
)
config.agents["planner"] = AgentConfig(
id="planner",
backend="command",
command="python -c \"print('huge-plan-marker' * 1000)\"",
system_prompt=Path("planner.md"),
)
config.agents["state_updater"] = AgentConfig(
id="state_updater",
backend="command",
command="python fake_state_writer.py",
system_prompt=Path("planner.md"),
)
runner = PipelineRunner(config, ArtifactStore(root, ".nightshift", run_id="test-run"))
result = runner.run_task(parse_tasks(TASK_MD)[0])
prompt = (root / "state-prompt.txt").read_text(encoding="utf-8")
self.assertEqual(result.status, "complete")
self.assertIn("current_allowed_files", prompt)
self.assertIn("# Plot State", prompt)
self.assertNotIn("huge-plan-marker", prompt)
def test_scene_editor_file_writer_gets_current_scene_file(self) -> None:
task_md = """# Tasks
- [ ] SCENE-001: Edit scene
Description:
Repair the scene.
Acceptance Criteria:
- Writes:
- `story/chapters/chapter-001/scene-001.md`
"""
with tempfile.TemporaryDirectory() as directory:
root = Path(directory)
_write_common_files(root)
(root / "tasks.md").write_text(task_md, encoding="utf-8")
scene_path = root / "story" / "chapters" / "chapter-001" / "scene-001.md"
scene_path.parent.mkdir(parents=True)
scene_path.write_text("Proxy walked home.\n", encoding="utf-8")
(root / "fake_editor.py").write_text(
"\n".join(
[
"import sys",
"prompt = sys.stdin.read()",
"open('editor-prompt.txt', 'w', encoding='utf-8').write(prompt)",
"if 'current_scene_file' in prompt and 'Proxy walked home.' in prompt:",
" print('FILE: story/chapters/chapter-001/scene-001.md')",
" print('---CONTENT---')",
" print('Proxy walked home corrected.')",
" print('---END---')",
"else:",
" print('')",
]
),
encoding="utf-8",
)
stages = (
StageConfig(
id="edit_scene",
type="file_writer",
agent="editor",
allowed_paths=("story/chapters",),
),
)
config = make_config(root, stages)
config.agents["editor"] = AgentConfig(
id="editor",
backend="command",
command="python fake_editor.py",
system_prompt=Path("planner.md"),
)
runner = PipelineRunner(config, ArtifactStore(root, ".nightshift", run_id="test-run"))
result = runner.run_task(parse_tasks(task_md)[0])
prompt = (root / "editor-prompt.txt").read_text(encoding="utf-8")
self.assertEqual(result.status, "complete")
self.assertIn("current_scene_file", prompt)
self.assertIn("Proxy walked home.", prompt)
def test_patch_validator_rejects_unsafe_patch(self) -> None:
with tempfile.TemporaryDirectory() as directory:
root = Path(directory)
_write_common_files(root)
stages = (
StageConfig(id="write", type="code_writer", agent="writer"),
StageConfig(id="validate", type="patch_validator"),
)
(root / "fake_writer.py").write_text(
"\n".join(
[
"print('diff --git a/.nightshift/log.txt b/.nightshift/log.txt')",
"print('--- a/.nightshift/log.txt')",
"print('+++ b/.nightshift/log.txt')",
"print('@@ -1 +1 @@')",
"print('-old')",
"print('+new')",
]
),
encoding="utf-8",
)
config = make_config(root, stages)
config.agents["writer"] = AgentConfig(
id="writer",
backend="command",
command="python fake_writer.py",
system_prompt=Path("planner.md"),
)
runner = PipelineRunner(config, ArtifactStore(root, ".nightshift", run_id="test-run"))
result = runner.run_task(parse_tasks(TASK_MD)[0])
self.assertEqual(result.status, "failed")
self.assertIn("forbidden path", result.reason)
def test_patch_validation_failure_can_retry_implementation(self) -> None:
with tempfile.TemporaryDirectory() as directory:
root = Path(directory)
_write_common_files(root)
(root / "app.py").write_text("old\n", encoding="utf-8")
(root / "fake_writer.py").write_text(
"\n".join(
[
"import sys",
"prompt = sys.stdin.read()",
"new_file_patch = 'Retry 1:' not in prompt",
"if new_file_patch:",
" print('diff --git a/app.py b/app.py')",
" print('new file mode 100644')",
" print('--- /dev/null')",
" print('+++ b/app.py')",
" print('@@ -0,0 +1 @@')",
" print('+bad')",
"else:",
" print('diff --git a/app.py b/app.py')",
" print('--- a/app.py')",
" print('+++ b/app.py')",
" print('@@ -1 +1 @@')",
" print('-old')",
" print('+new')",
]
),
encoding="utf-8",
)
stages = (
StageConfig(id="write", type="code_writer", agent="writer", output="proposed.patch"),
StageConfig(id="normalize", type="patch_normalizer"),
StageConfig(id="validate", type="patch_validator", on_fail="write"),
)
config = make_config(root, stages, max_retries=1)
config.agents["writer"] = AgentConfig(
id="writer",
backend="command",
command="python fake_writer.py",
system_prompt=Path("planner.md"),
)
runner = PipelineRunner(config, ArtifactStore(root, ".nightshift", run_id="test-run"))
result = runner.run_task(parse_tasks(TASK_MD)[0])
task_dir = root / ".nightshift" / "runs" / "test-run" / "tasks" / "TASK-001"
self.assertEqual(result.status, "complete")
self.assertEqual(result.retry_count, 1)
self.assertTrue(
any("creates existing file" in stage.reason for stage in result.stage_results)
)
self.assertTrue((task_dir / "repair-1.patch").exists())
self.assertTrue((task_dir / "normalized.patch").exists())
self.assertTrue((task_dir / "normalized-1.patch").exists())
self.assertTrue((task_dir / "patch-validation.md").exists())
self.assertTrue((task_dir / "patch-validation-1.md").exists())
def test_patch_apply_stage_applies_patch(self) -> None:
with tempfile.TemporaryDirectory() as directory:
root = Path(directory)
_write_common_files(root)
(root / "app.py").write_text("old\n", encoding="utf-8")
(root / "fake_writer.py").write_text(
"\n".join(
[
"print('diff --git a/app.py b/app.py')",
"print('--- a/app.py')",
"print('+++ b/app.py')",
"print('@@ -1 +1 @@')",
"print('-old')",
"print('+new')",
]
),
encoding="utf-8",
)
stages = (
StageConfig(id="write", type="code_writer", agent="writer"),
StageConfig(id="normalize", type="patch_normalizer"),
StageConfig(id="validate", type="patch_validator"),
StageConfig(id="apply", type="patch_apply", mode="apply"),
)
config = make_config(root, stages)
config.agents["writer"] = AgentConfig(
id="writer",
backend="command",
command="python fake_writer.py",
system_prompt=Path("planner.md"),
)
runner = PipelineRunner(config, ArtifactStore(root, ".nightshift", run_id="test-run"))
result = runner.run_task(parse_tasks(TASK_MD)[0])
task_dir = root / ".nightshift" / "runs" / "test-run" / "tasks" / "TASK-001"
self.assertEqual(result.status, "complete")
self.assertEqual((root / "app.py").read_text(encoding="utf-8"), "new\n")
self.assertTrue((task_dir / "applied.patch").exists())
self.assertTrue((task_dir / "patch-apply-output.txt").exists())
self.assertTrue((task_dir / "git-status-before-patch-apply.txt").exists())
self.assertTrue((task_dir / "git-status-after-patch-apply.txt").exists())
def test_test_failure_repairs_with_second_patch(self) -> None:
with tempfile.TemporaryDirectory() as directory:
root = Path(directory)
_write_common_files(root)
(root / "app.py").write_text("old\n", encoding="utf-8")
(root / "fake_writer.py").write_text(
"\n".join(
[
"from pathlib import Path",
"current = Path('app.py').read_text()",
"old, new = ('bad', 'new') if current == 'bad\\n' else ('old', 'bad')",
"print('diff --git a/app.py b/app.py')",
"print('--- a/app.py')",
"print('+++ b/app.py')",
"print('@@ -1 +1 @@')",
"print('-' + old)",
"print('+' + new)",
]
),
encoding="utf-8",
)
test_command = 'python -c "from pathlib import Path; import sys; ok = Path(\'app.py\').read_text().strip() == \'new\'; sys.stderr.write(\'expected new\\n\' if not ok else \'\'); raise SystemExit(0 if ok else 1)"'
stages = (
StageConfig(id="write", type="code_writer", agent="writer"),
StageConfig(id="normalize", type="patch_normalizer"),
StageConfig(id="validate", type="patch_validator"),
StageConfig(id="apply", type="patch_apply", mode="apply"),
StageConfig(
id="test",
type="command",
commands=(test_command,),
output="test-output.txt",
on_fail="write",
),
)
config = make_config(
root,
stages,
max_retries=1,
)
config = replace(
config,
safety=SafetyConfig(
require_clean_worktree=False,
scoped_paths=(".",),
allowed_commands=(test_command,),
forbidden_commands=("rm -rf",),
),
)
config.agents["writer"] = AgentConfig(
id="writer",
backend="command",
command="python fake_writer.py",
system_prompt=Path("planner.md"),
)
runner = PipelineRunner(config, ArtifactStore(root, ".nightshift", run_id="test-run"))
result = runner.run_task(parse_tasks(TASK_MD)[0])
task_dir = root / ".nightshift" / "runs" / "test-run" / "tasks" / "TASK-001"
self.assertEqual(result.status, "complete")
self.assertEqual(result.retry_count, 1)
self.assertEqual((root / "app.py").read_text(encoding="utf-8"), "new\n")
self.assertTrue((task_dir / "repair-1.patch").exists())
self.assertTrue((task_dir / "repair-summary-1.md").exists())
self.assertIn(
"expected new",
(task_dir / "write-agent-output-1.md").read_text(encoding="utf-8"),
)
self.assertTrue((task_dir / "normalized-1.patch").exists())
self.assertTrue((task_dir / "patch-validation-1.md").exists())
self.assertTrue((task_dir / "applied-1.patch").exists())
self.assertTrue((task_dir / "patch-apply-output-1.txt").exists())
def _write_common_files(root: Path) -> None:
(root / "nightshift.yaml").write_text("project:\n name: test\n", encoding="utf-8")
(root / "tasks.md").write_text(TASK_MD, encoding="utf-8")
(root / "planner.md").write_text("Plan.", encoding="utf-8")
(root / "reviewer.md").write_text("Review.", encoding="utf-8")
if __name__ == "__main__":
unittest.main()