documentation pass and hardening bugfixes

This commit is contained in:
K. Hodges 2026-05-17 00:49:17 -07:00
parent d84d580671
commit 528c0ddeb5
17 changed files with 397 additions and 12 deletions

82
QUICKSTART.md Normal file
View File

@ -0,0 +1,82 @@
# NightShift Quickstart
This guide runs the current MVP with safe example files.
## 1. Install for Development
```bash
pip install -e .
```
Or run the module directly:
```bash
python -m nightshift.cli --help
```
## 2. Create Starter Files
From a project directory:
```bash
nightshift init
```
This creates:
```text
nightshift.yaml
tasks.md
agents/
```
Existing starter files are not overwritten unless you pass `--force`.
## 3. Validate
```bash
nightshift validate
```
Validation checks config structure, task parsing, prompt files, scoped paths, and command safety.
## 4. Run One Task
Run the next incomplete task:
```bash
nightshift run
```
Run a specific task:
```bash
nightshift run --task TASK-001
```
## 5. Review Artifacts
After a run, inspect:
```text
.nightshift/runs/<run-id>/
```
Useful files:
```text
run-summary.md
config.snapshot.yaml
tasks/TASK-001/task.md
tasks/TASK-001/context.md
tasks/TASK-001/plan.md
tasks/TASK-001/test-output.txt
tasks/TASK-001/stage-results.md
tasks/TASK-001/context-out.md
tasks/TASK-001/final-notes.md
```
## Example Templates
Example run files are available in `templates/`.
They are safe starter examples and use command-backed fake agents.

View File

@ -1,5 +1,7 @@
# NightShift # NightShift
![NightShift logo](docs/images/logo.png)
Auditable local-first AI coding pipelines. Auditable local-first AI coding pipelines.
NightShift is a deterministic pipeline runner for long-running AI-assisted coding workflows. It runs one markdown task at a time through a declarative YAML pipeline, records the important artifacts, and leaves the user with a reviewable work package. NightShift is a deterministic pipeline runner for long-running AI-assisted coding workflows. It runs one markdown task at a time through a declarative YAML pipeline, records the important artifacts, and leaves the user with a reviewable work package.

BIN
docs/images/logo.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.3 MiB

View File

@ -159,8 +159,8 @@ class AgentExecutor:
command=agent.command, command=agent.command,
prompt=prompt, prompt=prompt,
exit_code=-1, exit_code=-1,
stdout=exc.stdout or "", stdout=_coerce_output(exc.stdout),
stderr=exc.stderr or "", stderr=_coerce_output(exc.stderr),
duration_seconds=duration, duration_seconds=duration,
timed_out=True, timed_out=True,
) )
@ -225,6 +225,14 @@ def build_prompt_bundle(
) )
def _coerce_output(value: str | bytes | None) -> str:
if value is None:
return ""
if isinstance(value, bytes):
return value.decode("utf-8", errors="replace")
return value
def output_contract_for(stage: StageConfig) -> str: def output_contract_for(stage: StageConfig) -> str:
if stage.type in {"agent_review", "review"}: if stage.type in {"agent_review", "review"}:
return "\n".join( return "\n".join(

View File

@ -6,6 +6,7 @@ from dataclasses import dataclass
from datetime import datetime, timezone from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
import shutil import shutil
import re
from .config import NightShiftConfig from .config import NightShiftConfig
from .errors import ArtifactError, SafetyError from .errors import ArtifactError, SafetyError
@ -32,7 +33,7 @@ class ArtifactStore:
except SafetyError as exc: except SafetyError as exc:
raise ArtifactError(str(exc)) from exc raise ArtifactError(str(exc)) from exc
self.run_id = run_id or default_run_id() self.run_id = _safe_artifact_segment(run_id or default_run_id(), "run id")
self.run_dir = self._artifact_path("runs", self.run_id) self.run_dir = self._artifact_path("runs", self.run_id)
self.tasks_dir = self.run_dir / "tasks" self.tasks_dir = self.run_dir / "tasks"
self.project_context_path = self.artifact_root / "project-context.md" self.project_context_path = self.artifact_root / "project-context.md"
@ -71,10 +72,11 @@ class ArtifactStore:
"""Create the artifact directory for one task.""" """Create the artifact directory for one task."""
self.initialize_run() self.initialize_run()
task_dir = self._artifact_path("runs", self.run_id, "tasks", task_id) safe_task_id = _safe_artifact_segment(task_id, "task id")
task_dir = self._artifact_path("runs", self.run_id, "tasks", safe_task_id)
task_dir.mkdir(parents=True, exist_ok=True) task_dir.mkdir(parents=True, exist_ok=True)
return TaskArtifactPaths( return TaskArtifactPaths(
task_id=task_id, task_id=safe_task_id,
directory=task_dir, directory=task_dir,
task_snapshot=task_dir / "task.md", task_snapshot=task_dir / "task.md",
) )
@ -122,3 +124,15 @@ def default_run_id(now: datetime | None = None) -> str:
value = now or datetime.now(timezone.utc) value = now or datetime.now(timezone.utc)
return value.strftime("%Y%m%dT%H%M%SZ") return value.strftime("%Y%m%dT%H%M%SZ")
def _safe_artifact_segment(value: str, context: str) -> str:
if not isinstance(value, str) or not value:
raise ArtifactError(f"Artifact error: {context} must be a non-empty string.")
if not re.fullmatch(r"[A-Za-z0-9_.-]+", value):
raise ArtifactError(
f"Artifact error: {context} contains unsafe characters: {value}"
)
if value in {".", ".."}:
raise ArtifactError(f"Artifact error: {context} cannot be '{value}'.")
return value

View File

@ -112,8 +112,8 @@ class CommandExecutor:
return CommandRun( return CommandRun(
command=normalized, command=normalized,
exit_code=-1, exit_code=-1,
stdout=exc.stdout or "", stdout=_coerce_output(exc.stdout),
stderr=exc.stderr or "", stderr=_coerce_output(exc.stderr),
duration_seconds=duration, duration_seconds=duration,
timed_out=True, timed_out=True,
) )
@ -146,3 +146,11 @@ def format_command_runs(stage_id: str, runs: list[CommandRun]) -> str:
] ]
) )
return "\n".join(lines) return "\n".join(lines)
def _coerce_output(value: str | bytes | None) -> str:
if value is None:
return ""
if isinstance(value, bytes):
return value.decode("utf-8", errors="replace")
return value

View File

@ -143,7 +143,10 @@ def parse_config(raw: dict[str, Any], config_path: Path) -> NightShiftConfig:
safety_raw = _require_mapping(raw["safety"], "safety") safety_raw = _require_mapping(raw["safety"], "safety")
safety = SafetyConfig( safety = SafetyConfig(
require_clean_worktree=bool(safety_raw.get("require_clean_worktree", False)), require_clean_worktree=_optional_bool(
safety_raw.get("require_clean_worktree", False),
"safety.require_clean_worktree",
),
scoped_paths=_string_tuple(safety_raw.get("scoped_paths", []), "safety.scoped_paths"), scoped_paths=_string_tuple(safety_raw.get("scoped_paths", []), "safety.scoped_paths"),
allowed_commands=_string_tuple(safety_raw.get("allowed_commands", []), "safety.allowed_commands"), allowed_commands=_string_tuple(safety_raw.get("allowed_commands", []), "safety.allowed_commands"),
forbidden_commands=_string_tuple( forbidden_commands=_string_tuple(
@ -159,6 +162,15 @@ def parse_config(raw: dict[str, Any], config_path: Path) -> NightShiftConfig:
agent_raw = _require_mapping(agent_raw_value, f"agents.{agent_id}") agent_raw = _require_mapping(agent_raw_value, f"agents.{agent_id}")
backend = _require_string(agent_raw, "backend", f"agents.{agent_id}") backend = _require_string(agent_raw, "backend", f"agents.{agent_id}")
command = _optional_string(agent_raw.get("command"), f"agents.{agent_id}.command") command = _optional_string(agent_raw.get("command"), f"agents.{agent_id}.command")
if backend != "command":
raise ConfigError(
f"Config error: agent '{agent_id}' uses unsupported backend '{backend}'. "
"Supported backends: command."
)
if command is None:
raise ConfigError(
f"Config error: command backend agent '{agent_id}' must define command."
)
system_prompt = Path(_require_string(agent_raw, "system_prompt", f"agents.{agent_id}")) system_prompt = Path(_require_string(agent_raw, "system_prompt", f"agents.{agent_id}"))
agents[str(agent_id)] = AgentConfig( agents[str(agent_id)] = AgentConfig(
id=str(agent_id), id=str(agent_id),
@ -170,7 +182,10 @@ def parse_config(raw: dict[str, Any], config_path: Path) -> NightShiftConfig:
) )
pipeline_raw = _require_mapping(raw["pipeline"], "pipeline") pipeline_raw = _require_mapping(raw["pipeline"], "pipeline")
max_task_retries = int(pipeline_raw.get("max_task_retries", 0)) max_task_retries = _optional_int(
pipeline_raw.get("max_task_retries", 0),
"pipeline.max_task_retries",
)
if max_task_retries < 0: if max_task_retries < 0:
raise ConfigError("Config error: pipeline.max_task_retries must be zero or greater.") raise ConfigError("Config error: pipeline.max_task_retries must be zero or greater.")
@ -211,6 +226,10 @@ def parse_config(raw: dict[str, Any], config_path: Path) -> NightShiftConfig:
if stage_type in COMMAND_STAGE_TYPES and not commands: if stage_type in COMMAND_STAGE_TYPES and not commands:
raise ConfigError(f"Config error: command stage '{stage_id}' must define commands.") raise ConfigError(f"Config error: command stage '{stage_id}' must define commands.")
if stage_type not in COMMAND_STAGE_TYPES and commands:
raise ConfigError(
f"Config error: non-command stage '{stage_id}' must not define commands."
)
stages.append( stages.append(
StageConfig( StageConfig(
@ -246,7 +265,10 @@ def _load_yaml_mapping(path: Path) -> dict[str, Any]:
except ModuleNotFoundError: except ModuleNotFoundError:
data = _parse_simple_yaml(text) data = _parse_simple_yaml(text)
else: else:
data = yaml.safe_load(text) try:
data = yaml.safe_load(text)
except yaml.YAMLError as exc: # type: ignore[attr-defined]
raise ConfigError(f"Config error: invalid YAML in {path}: {exc}") from exc
if data is None: if data is None:
data = {} data = {}
@ -399,6 +421,18 @@ def _optional_string(value: Any, context: str) -> str | None:
return value return value
def _optional_bool(value: Any, context: str) -> bool:
if isinstance(value, bool):
return value
raise ConfigError(f"Config error: '{context}' must be a boolean.")
def _optional_int(value: Any, context: str) -> int:
if isinstance(value, bool) or not isinstance(value, int):
raise ConfigError(f"Config error: '{context}' must be an integer.")
return value
def _string_tuple(value: Any, context: str) -> tuple[str, ...]: def _string_tuple(value: Any, context: str) -> tuple[str, ...]:
if value is None: if value is None:
return () return ()

View File

@ -11,6 +11,7 @@ from .commands import CommandExecutor
from .config import COMMAND_STAGE_TYPES, NightShiftConfig, StageConfig from .config import COMMAND_STAGE_TYPES, NightShiftConfig, StageConfig
from .context import ContextManager from .context import ContextManager
from .errors import PipelineError from .errors import PipelineError
from .errors import NightShiftError
from .reports import ReportGenerator from .reports import ReportGenerator
from .stages import StageResult from .stages import StageResult
from .tasks import Task from .tasks import Task
@ -72,7 +73,20 @@ class PipelineRunner:
while index < len(stages): while index < len(stages):
stage = stages[index] stage = stages[index]
result = self._run_stage(stage, task, previous_outputs, retry_notes) try:
result = self._run_stage(stage, task, previous_outputs, retry_notes)
except NightShiftError as exc:
result = StageResult(
stage_id=stage.id,
status="fail",
reason=str(exc),
)
except OSError as exc:
result = StageResult(
stage_id=stage.id,
status="fail",
reason=f"Unexpected OS error while running stage: {exc}",
)
stage_results.append(result) stage_results.append(result)
previous_outputs[stage.id] = self._read_output(result.output_path) previous_outputs[stage.id] = self._read_output(result.output_path)
if result.context_update: if result.context_update:
@ -203,4 +217,3 @@ def format_summary_stage(
"", "",
] ]
) )

View File

@ -0,0 +1,8 @@
# Implementer
Describe the smallest implementation steps for the task.
Rules:
- Stay inside the configured project root.
- Keep notes concise.
- Mention any test expectations.

View File

@ -0,0 +1,8 @@
# Planner
Create a concise plan for the task.
Rules:
- Do not edit files.
- Map the task to acceptance criteria.
- Keep output reviewable.

View File

@ -0,0 +1,10 @@
# Reviewer
Review the task result.
For this fake-agent template, return a passing structured review:
status: pass
reason: example reviewer accepted the run
next_stage:
context_update:

61
templates/nightshift.yaml Normal file
View File

@ -0,0 +1,61 @@
project:
name: nightshift-example
root: .
task_file: tasks.md
artifact_dir: .nightshift
safety:
require_clean_worktree: false
scoped_paths:
- .
allowed_commands:
- python -c "print('template command stage ok')"
forbidden_commands:
- rm -rf
- git push
- curl | bash
agents:
planner:
backend: command
command: python -c "print('Plan generated by template planner.')"
system_prompt: agents/planner.md
implementer:
backend: command
command: python -c "print('Implementation notes generated by template implementer.')"
system_prompt: agents/implementer.md
reviewer:
backend: command
command: python -c "print('status: pass'); print('reason: template reviewer accepted the run')"
system_prompt: agents/reviewer.md
pipeline:
max_task_retries: 1
stages:
- id: plan
type: agent
agent: planner
output: plan.md
- id: implement
type: agent
agent: implementer
output: implementation-log.md
- id: test
type: command
commands:
- python -c "print('template command stage ok')"
output: test-output.txt
- id: review
type: agent_review
agent: reviewer
on_fail: implement
output: review.md
- id: summarize
type: summarize
output: final-notes.md

11
templates/tasks.md Normal file
View File

@ -0,0 +1,11 @@
# Tasks
- [ ] TASK-001: Run the example pipeline
Description:
Exercise the NightShift MVP with fake command-backed agents and a harmless test command.
Acceptance Criteria:
- The pipeline creates task artifacts
- The command stage output is recorded
- The final report explains the run status

View File

@ -51,6 +51,17 @@ class ArtifactStoreTests(unittest.TestCase):
with self.assertRaisesRegex(ArtifactError, "escapes task directory"): with self.assertRaisesRegex(ArtifactError, "escapes task directory"):
store.write_stage_output("TASK-001", "../leak.txt", "nope") store.write_stage_output("TASK-001", "../leak.txt", "nope")
def test_run_id_and_task_id_must_be_safe_path_segments(self) -> None:
with tempfile.TemporaryDirectory() as directory:
root = Path(directory)
with self.assertRaisesRegex(ArtifactError, "run id contains unsafe"):
ArtifactStore(root, ".nightshift", run_id="../run")
store = ArtifactStore(root, ".nightshift", run_id="safe-run")
with self.assertRaisesRegex(ArtifactError, "task id contains unsafe"):
store.create_task_dir("../TASK-001")
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()

View File

@ -89,6 +89,36 @@ class CommandExecutorTests(unittest.TestCase):
with self.assertRaisesRegex(CommandError, "not allowlisted"): with self.assertRaisesRegex(CommandError, "not allowlisted"):
executor.run_command(FAILING_COMMAND) executor.run_command(FAILING_COMMAND)
def test_command_timeout_returns_failed_stage_and_writes_output(self) -> None:
slow_command = 'python -c "import time; print(\'start\'); time.sleep(2)"'
with tempfile.TemporaryDirectory() as directory:
root = Path(directory)
artifacts = ArtifactStore(root, ".nightshift", run_id="test-run")
executor = CommandExecutor(
root,
SafetyConfig(
require_clean_worktree=False,
scoped_paths=(".",),
allowed_commands=(slow_command,),
forbidden_commands=("rm -rf",),
),
artifacts,
timeout_seconds=0.1,
)
stage = StageConfig(
id="test",
type="command",
commands=(slow_command,),
output="test-output.txt",
)
result = executor.run_stage(stage, "TASK-001")
self.assertEqual(result.status, "fail")
self.assertIn("timed out", result.reason)
output = (root / result.output_path).read_text(encoding="utf-8")
self.assertIn("Timed out: true", output)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()

View File

@ -79,6 +79,72 @@ class ConfigTests(unittest.TestCase):
with self.assertRaisesRegex(ConfigError, "not allowlisted"): with self.assertRaisesRegex(ConfigError, "not allowlisted"):
validate_config(config_path) validate_config(config_path)
def test_max_task_retries_must_be_integer(self) -> None:
with tempfile.TemporaryDirectory() as directory:
root = Path(directory)
init_project(root)
config_path = root / "nightshift.yaml"
config_path.write_text(
config_path.read_text(encoding="utf-8").replace(
"max_task_retries: 3",
"max_task_retries: three",
),
encoding="utf-8",
)
with self.assertRaisesRegex(ConfigError, "pipeline.max_task_retries"):
load_config(config_path)
def test_require_clean_worktree_must_be_boolean(self) -> None:
with tempfile.TemporaryDirectory() as directory:
root = Path(directory)
init_project(root)
config_path = root / "nightshift.yaml"
config_path.write_text(
config_path.read_text(encoding="utf-8").replace(
"require_clean_worktree: false",
"require_clean_worktree: no-thanks",
),
encoding="utf-8",
)
with self.assertRaisesRegex(ConfigError, "safety.require_clean_worktree"):
load_config(config_path)
def test_command_backend_agent_requires_command(self) -> None:
with tempfile.TemporaryDirectory() as directory:
root = Path(directory)
init_project(root)
config_path = root / "nightshift.yaml"
config_path.write_text(
config_path.read_text(encoding="utf-8").replace(
" command: echo\n system_prompt: agents/planner.md",
" system_prompt: agents/planner.md",
1,
),
encoding="utf-8",
)
with self.assertRaisesRegex(ConfigError, "must define command"):
load_config(config_path)
def test_non_command_stage_cannot_define_commands(self) -> None:
with tempfile.TemporaryDirectory() as directory:
root = Path(directory)
init_project(root)
config_path = root / "nightshift.yaml"
config_path.write_text(
config_path.read_text(encoding="utf-8").replace(
" output: plan.md",
" output: plan.md\n commands:\n - python -m unittest",
1,
),
encoding="utf-8",
)
with self.assertRaisesRegex(ConfigError, "non-command stage 'plan'"):
load_config(config_path)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()

View File

@ -126,6 +126,25 @@ class PipelineRunnerTests(unittest.TestCase):
self.assertIn("Retry limit reached", result.reason) self.assertIn("Retry limit reached", result.reason)
self.assertEqual([item.stage_id for item in result.stage_results], ["implement", "review", "implement", "review", "implement", "review"]) self.assertEqual([item.stage_id for item in result.stage_results], ["implement", "review", "implement", "review", "implement", "review"])
def test_stage_error_is_reported_as_failed_result(self) -> None:
with tempfile.TemporaryDirectory() as directory:
root = Path(directory)
_write_common_files(root)
stages = (
StageConfig(id="plan", type="agent", agent="planner", output="../bad.md"),
)
config = make_config(root, stages)
runner = PipelineRunner(config, ArtifactStore(root, ".nightshift", run_id="test-run"))
task = parse_tasks(TASK_MD)[0]
result = runner.run_task(task)
self.assertEqual(result.status, "failed")
self.assertEqual(result.stage_results[0].status, "fail")
self.assertTrue(
(root / ".nightshift" / "runs" / "test-run" / "tasks" / task.id / "final-notes.md").exists()
)
def _write_common_files(root: Path) -> None: def _write_common_files(root: Path) -> None:
(root / "nightshift.yaml").write_text("project:\n name: test\n", encoding="utf-8") (root / "nightshift.yaml").write_text("project:\n name: test\n", encoding="utf-8")