mirror of
https://github.com/khodges42/nightShift.git
synced 2026-06-14 10:08:37 +00:00
Checked out commit from rsarv3006 which is super interesting, grabbed some inspiration from it and mentioned it in the ideas file.
380 lines
11 KiB
Python
380 lines
11 KiB
Python
from pathlib import Path
|
|
import tempfile
|
|
import unittest
|
|
|
|
from nightshift.config import SafetyConfig
|
|
from nightshift.errors import PipelineError
|
|
from nightshift.patches import (
|
|
generate_patch_from_file_updates,
|
|
normalize_patch_text,
|
|
parse_file_updates,
|
|
repair_hunk_counts,
|
|
validate_patch,
|
|
)
|
|
|
|
|
|
PATCH = """diff --git a/src/app.py b/src/app.py
|
|
--- a/src/app.py
|
|
+++ b/src/app.py
|
|
@@ -1 +1 @@
|
|
-old
|
|
+new
|
|
"""
|
|
|
|
|
|
class PatchTests(unittest.TestCase):
|
|
def test_normalize_extracts_fenced_patch(self) -> None:
|
|
text = f"Here it is:\n```diff\n{PATCH}```\n"
|
|
|
|
self.assertEqual(normalize_patch_text(text), PATCH)
|
|
|
|
def test_validate_patch_enforces_scopes(self) -> None:
|
|
with tempfile.TemporaryDirectory() as directory:
|
|
root = Path(directory)
|
|
(root / "src").mkdir()
|
|
safety = SafetyConfig(
|
|
require_clean_worktree=False,
|
|
scoped_paths=("src",),
|
|
allowed_commands=(),
|
|
forbidden_commands=(),
|
|
)
|
|
|
|
result = validate_patch(PATCH, root, safety)
|
|
|
|
self.assertEqual(result.files, ("src/app.py",))
|
|
self.assertEqual(result.changed_lines, 2)
|
|
|
|
def test_validate_patch_rejects_forbidden_path(self) -> None:
|
|
with tempfile.TemporaryDirectory() as directory:
|
|
root = Path(directory)
|
|
safety = SafetyConfig(
|
|
require_clean_worktree=False,
|
|
scoped_paths=(".",),
|
|
allowed_commands=(),
|
|
forbidden_commands=(),
|
|
)
|
|
patch = PATCH.replace("src/app.py", ".nightshift/log.txt")
|
|
|
|
with self.assertRaisesRegex(PipelineError, "forbidden path"):
|
|
validate_patch(patch, root, safety)
|
|
|
|
def test_validate_patch_enforces_stage_allowed_paths(self) -> None:
|
|
with tempfile.TemporaryDirectory() as directory:
|
|
root = Path(directory)
|
|
(root / "src").mkdir()
|
|
(root / "tests").mkdir()
|
|
safety = SafetyConfig(
|
|
require_clean_worktree=False,
|
|
scoped_paths=("src", "tests"),
|
|
allowed_commands=(),
|
|
forbidden_commands=(),
|
|
)
|
|
|
|
with self.assertRaisesRegex(PipelineError, "not allowed for this stage"):
|
|
validate_patch(PATCH, root, safety, allowed_paths=("tests",))
|
|
|
|
def test_validate_patch_rejects_malformed_hunk_line(self) -> None:
|
|
with tempfile.TemporaryDirectory() as directory:
|
|
root = Path(directory)
|
|
(root / "src").mkdir()
|
|
safety = SafetyConfig(
|
|
require_clean_worktree=False,
|
|
scoped_paths=("src",),
|
|
allowed_commands=(),
|
|
forbidden_commands=(),
|
|
)
|
|
patch = """diff --git a/src/app.py b/src/app.py
|
|
--- a/src/app.py
|
|
+++ b/src/app.py
|
|
@@ -1 +1,2 @@
|
|
-old
|
|
+new
|
|
bare line
|
|
"""
|
|
|
|
with self.assertRaisesRegex(PipelineError, "malformed hunk line"):
|
|
validate_patch(patch, root, safety)
|
|
|
|
def test_validate_patch_rejects_new_file_when_target_exists(self) -> None:
|
|
with tempfile.TemporaryDirectory() as directory:
|
|
root = Path(directory)
|
|
(root / "src").mkdir()
|
|
(root / "src" / "app.py").write_text("old\n", encoding="utf-8")
|
|
safety = SafetyConfig(
|
|
require_clean_worktree=False,
|
|
scoped_paths=("src",),
|
|
allowed_commands=(),
|
|
forbidden_commands=(),
|
|
)
|
|
patch = """diff --git a/src/app.py b/src/app.py
|
|
new file mode 100644
|
|
--- /dev/null
|
|
+++ b/src/app.py
|
|
@@ -0,0 +1 @@
|
|
+new
|
|
"""
|
|
|
|
with self.assertRaisesRegex(PipelineError, "creates existing file"):
|
|
validate_patch(patch, root, safety)
|
|
|
|
def test_validate_patch_rejects_hunk_count_mismatch(self) -> None:
|
|
with tempfile.TemporaryDirectory() as directory:
|
|
root = Path(directory)
|
|
(root / "src").mkdir()
|
|
safety = SafetyConfig(
|
|
require_clean_worktree=False,
|
|
scoped_paths=("src",),
|
|
allowed_commands=(),
|
|
forbidden_commands=(),
|
|
)
|
|
patch = """diff --git a/src/app.py b/src/app.py
|
|
--- a/src/app.py
|
|
+++ b/src/app.py
|
|
@@ -1 +1,2 @@
|
|
-old
|
|
+new
|
|
"""
|
|
|
|
with self.assertRaisesRegex(PipelineError, "new line count expected 2, got 1"):
|
|
validate_patch(patch, root, safety)
|
|
|
|
def test_normalize_repairs_hunk_count_mismatch(self) -> None:
|
|
lines = "\n".join(f"+line {number}" for number in range(38))
|
|
patch = f"""diff --git a/src/app.py b/src/app.py
|
|
--- /dev/null
|
|
+++ b/src/app.py
|
|
@@ -0,0 +1,40 @@
|
|
{lines}
|
|
"""
|
|
|
|
normalized = normalize_patch_text(patch)
|
|
|
|
self.assertIn("@@ -0,0 +1,38 @@", normalized)
|
|
|
|
def test_validate_patch_counts_hunk_lines_that_look_like_headers(self) -> None:
|
|
with tempfile.TemporaryDirectory() as directory:
|
|
root = Path(directory)
|
|
(root / "src").mkdir()
|
|
safety = SafetyConfig(
|
|
require_clean_worktree=False,
|
|
scoped_paths=("src",),
|
|
allowed_commands=(),
|
|
forbidden_commands=(),
|
|
)
|
|
patch = """diff --git a/src/app.py b/src/app.py
|
|
--- a/src/app.py
|
|
+++ b/src/app.py
|
|
@@ -1,3 +1,3 @@
|
|
context
|
|
---
|
|
----
|
|
+++
|
|
++++
|
|
"""
|
|
|
|
result = validate_patch(patch, root, safety)
|
|
|
|
self.assertEqual(result.changed_lines, 4)
|
|
|
|
def test_repair_hunk_counts_counts_header_like_body_lines(self) -> None:
|
|
patch = """diff --git a/src/app.py b/src/app.py
|
|
--- a/src/app.py
|
|
+++ b/src/app.py
|
|
@@ -1 +1 @@
|
|
context
|
|
---
|
|
+++
|
|
"""
|
|
|
|
repaired = repair_hunk_counts(patch)
|
|
|
|
self.assertIn("@@ -1,2 +1,2 @@", repaired)
|
|
|
|
def test_validate_patch_accepts_multiple_files(self) -> None:
|
|
with tempfile.TemporaryDirectory() as directory:
|
|
root = Path(directory)
|
|
(root / "src").mkdir()
|
|
safety = SafetyConfig(
|
|
require_clean_worktree=False,
|
|
scoped_paths=("src",),
|
|
allowed_commands=(),
|
|
forbidden_commands=(),
|
|
)
|
|
patch = """diff --git a/src/app.py b/src/app.py
|
|
--- a/src/app.py
|
|
+++ b/src/app.py
|
|
@@ -1 +1 @@
|
|
-old
|
|
+new
|
|
diff --git a/src/test_app.py b/src/test_app.py
|
|
--- a/src/test_app.py
|
|
+++ b/src/test_app.py
|
|
@@ -1 +1 @@
|
|
-old test
|
|
+new test
|
|
"""
|
|
|
|
result = validate_patch(patch, root, safety)
|
|
|
|
self.assertEqual(result.files, ("src/app.py", "src/test_app.py"))
|
|
|
|
def test_file_updates_generate_unified_diff(self) -> None:
|
|
with tempfile.TemporaryDirectory() as directory:
|
|
root = Path(directory)
|
|
(root / "src").mkdir()
|
|
(root / "src" / "app.py").write_text("old\n", encoding="utf-8")
|
|
safety = SafetyConfig(
|
|
require_clean_worktree=False,
|
|
scoped_paths=("src",),
|
|
allowed_commands=(),
|
|
forbidden_commands=(),
|
|
)
|
|
updates = parse_file_updates(
|
|
"""```file:src/app.py
|
|
new
|
|
```
|
|
```file:src/test_app.py
|
|
test
|
|
```
|
|
"""
|
|
)
|
|
|
|
patch = generate_patch_from_file_updates(updates, root, safety)
|
|
result = validate_patch(patch, root, safety)
|
|
|
|
self.assertIn("diff --git a/src/app.py b/src/app.py", patch)
|
|
self.assertIn("diff --git a/src/test_app.py b/src/test_app.py", patch)
|
|
self.assertIn("new file mode 100644", patch)
|
|
self.assertEqual(result.files, ("src/app.py", "src/test_app.py"))
|
|
|
|
def test_file_updates_parse_explicit_delimiters(self) -> None:
|
|
updates = parse_file_updates(
|
|
"""FILE: story/chapters/chapter-001/scene-001.md
|
|
---CONTENT---
|
|
Sunlight did not belong here.
|
|
---END---
|
|
"""
|
|
)
|
|
|
|
self.assertEqual(len(updates), 1)
|
|
self.assertEqual(updates[0].path, "story/chapters/chapter-001/scene-001.md")
|
|
self.assertEqual(updates[0].content, "Sunlight did not belong here.\n")
|
|
|
|
def test_file_updates_parse_delimiters_without_end_before_next_file(self) -> None:
|
|
updates = parse_file_updates(
|
|
"""Intro prose is ignored.
|
|
|
|
FILE: story/plot-state.md
|
|
---CONTENT---
|
|
# Plot State
|
|
|
|
- Scene two happened.
|
|
|
|
FILE: story/timeline.md
|
|
---CONTENT---
|
|
# Timeline
|
|
|
|
- SCENE-002 complete.
|
|
"""
|
|
)
|
|
|
|
self.assertEqual(len(updates), 2)
|
|
self.assertEqual(updates[0].path, "story/plot-state.md")
|
|
self.assertEqual(updates[0].content, "# Plot State\n\n- Scene two happened.\n")
|
|
self.assertEqual(updates[1].path, "story/timeline.md")
|
|
self.assertEqual(updates[1].content, "# Timeline\n\n- SCENE-002 complete.\n")
|
|
|
|
def test_file_updates_parse_mixed_delimiter_end_and_next_file(self) -> None:
|
|
updates = parse_file_updates(
|
|
"""FILE: story/plot-state.md
|
|
---CONTENT---
|
|
first
|
|
---END---
|
|
|
|
FILE: story/timeline.md
|
|
---CONTENT---
|
|
second
|
|
"""
|
|
)
|
|
|
|
self.assertEqual(len(updates), 2)
|
|
self.assertEqual(updates[0].content, "first\n")
|
|
self.assertEqual(updates[1].content, "second\n")
|
|
|
|
def test_file_updates_reject_duplicate_blocks(self) -> None:
|
|
with tempfile.TemporaryDirectory() as directory:
|
|
root = Path(directory)
|
|
safety = SafetyConfig(
|
|
require_clean_worktree=False,
|
|
scoped_paths=(".",),
|
|
allowed_commands=(),
|
|
forbidden_commands=(),
|
|
)
|
|
updates = parse_file_updates(
|
|
"""```file:app.py
|
|
one
|
|
```
|
|
```file:app.py
|
|
two
|
|
```
|
|
"""
|
|
)
|
|
|
|
with self.assertRaisesRegex(PipelineError, "duplicate file block"):
|
|
generate_patch_from_file_updates(updates, root, safety)
|
|
|
|
def test_file_updates_enforce_stage_allowed_paths(self) -> None:
|
|
with tempfile.TemporaryDirectory() as directory:
|
|
root = Path(directory)
|
|
(root / "story" / "chapters").mkdir(parents=True)
|
|
safety = SafetyConfig(
|
|
require_clean_worktree=False,
|
|
scoped_paths=("story",),
|
|
allowed_commands=(),
|
|
forbidden_commands=(),
|
|
)
|
|
updates = parse_file_updates(
|
|
"""```file:story/chapters/scene.md
|
|
scene
|
|
```
|
|
```file:story/plot-state.md
|
|
state
|
|
```
|
|
"""
|
|
)
|
|
|
|
with self.assertRaisesRegex(PipelineError, "not allowed for this stage"):
|
|
generate_patch_from_file_updates(
|
|
updates,
|
|
root,
|
|
safety,
|
|
allowed_paths=("story/chapters",),
|
|
)
|
|
|
|
def test_file_updates_allow_identical_duplicate_blocks(self) -> None:
|
|
with tempfile.TemporaryDirectory() as directory:
|
|
root = Path(directory)
|
|
(root / "app.py").write_text("old\n", encoding="utf-8")
|
|
safety = SafetyConfig(
|
|
require_clean_worktree=False,
|
|
scoped_paths=(".",),
|
|
allowed_commands=(),
|
|
forbidden_commands=(),
|
|
)
|
|
updates = parse_file_updates(
|
|
"""```file:app.py
|
|
new
|
|
```
|
|
```file:app.py
|
|
new
|
|
```
|
|
"""
|
|
)
|
|
|
|
patch = generate_patch_from_file_updates(updates, root, safety)
|
|
|
|
self.assertEqual(patch.count("diff --git a/app.py b/app.py"), 1)
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|