add integ runs, dynamic model choices, symantic search, better file creation, debugging agents

This commit is contained in:
K. Hodges 2026-05-20 02:36:23 -07:00
parent 05471e432e
commit 7c54050223
56 changed files with 2969 additions and 22 deletions

1
.gitignore vendored
View File

@ -52,6 +52,7 @@ coverage.xml
cover/
tiny-lisp-nightshift/
nightshift-imageboard/
integ_runs/
# Translations
*.mo

View File

@ -124,6 +124,36 @@ Other built-in real-model templates:
```bash
nightshift init --template real-simple --root bookmarks-demo
nightshift init --template real-long-running --root incident-service
nightshift init --template tutorial-pastebin --root nightshift-pastebin
```
Create an isolated integration sandbox for a template:
```bash
python -m nightshift.cli integ-run --template tutorial-pastebin
cd integ_runs/<timestamp>/project
```
Activate the generated virtual environment, then install and run the project.
PowerShell:
```powershell
..\.venv\Scripts\Activate.ps1
python -m pip install -e ..\..\..
python -m pip install -e . pytest flask
python -m nightshift.cli validate
python -m nightshift.cli run --task TASK-001
```
Bash:
```bash
source ../.venv/bin/activate
python -m pip install -e ../../..
python -m pip install -e . pytest flask
python -m nightshift.cli validate
python -m nightshift.cli run --task TASK-001
```
Open the read-only artifact dashboard:
@ -344,6 +374,7 @@ Additional docs:
- [Quickstart](QUICKSTART.md)
- [Tutorial 01: imageboard with real local models](examples/tutorial/01-imageboard/README.md)
- [Tutorial 02: Lisp with real local models](examples/tutorial/02-lisp/README.md)
- [Tutorial 03: Pastebin with model fallback and telemetry](examples/tutorial/03-pastebin/README.md)
- [Config reference](docs/config-reference.md)
- [Artifact review workflow](docs/artifact-review.md)
- [Troubleshooting](docs/troubleshooting.md)

View File

@ -1,12 +1,9 @@
# Bugfix TODO
## Some issues going with run --all
reason=Stage 'review' requested unknown next stage 'None'.
reason=Stage 'review' requested unknown next stage 'None'. Not every time. I think there's a pattern that is out of place here. Maybe it's related to the last task success? Or the last run?
## TASK-002 in imageboard tutorial tries to make an image
tries to make an image for a test, I dont think it should do that.
I think maybe some feature to create dummy files for tests would be useful
Or maybe we have a little library of dummy files for use? Not sure how it would get an image otherwise.
## Going from individual tasks to --all fails

View File

@ -39,6 +39,24 @@ planner:
system_prompt: agents/planner.md
```
Agent roles:
- `role: debugger` marks an agent as diagnosis-only. When a stage fails and a debugger is configured, NightShift sends the task, failed stage output, and retry history to that agent before the next retry.
Stage model routing:
```yaml
agent_pool:
- small-implementer
- larger-implementer
```
When `agent_pool` is set, NightShift uses the first agent initially and advances through the list as retry count increases. Each agent still owns its own backend, model, and temperature.
Telemetry:
NightShift writes `telemetry-summary.md` at both run and task scope. The summary estimates prompt/output tokens from captured prompts and responses, records stage runtime, retry count, status, agent id, and model, and groups success/failure statistics per model.
Ollama agent:
```yaml
@ -66,6 +84,7 @@ Patch validator stage options:
- `max_files`: max files changed.
- `max_lines`: max changed lines.
- `max_delete_ratio`: reject deletion-heavy patches above this deleted-line share, from `0.0` to `1.0`.
- `forbidden_paths`: paths the patch must not touch.
- Unified diff hunk line prefixes and hunk line counts are validated before patch apply.
- The patch normalizer recomputes hunk line counts from hunk bodies for direct unified diff output.
@ -82,3 +101,86 @@ Writer stages:
<complete file content>
```
````
Semantic context stage:
```yaml
- id: semantic_context
type: semantic_context
output: semantic-context.md
```
This stage builds a lightweight repository index of files, Python symbols, imports, and tests, then writes compact relevant snippets for the current task. It is keyword based with symbol-aware scoring, so it works without a vector database or network dependency.
## Failure, Retry, and Resource Artifacts
Failed command and validation stages write deterministic diagnostics under the task artifact directory:
- `diagnostics/<stage>-failure.md`: failure category, probable root cause, confidence, recommended next action, retry recommendation, modified files, and failing tests.
- `diagnostics/dependency-diagnostic.md`: Python missing-import and manifest guidance when the classifier detects dependency failures.
- `retry-memory.md`: compact summaries of previous attempts.
- `escalation-policy.md`: churn detection result and recommended escalation action.
- `resource-requests.md` plus `resources/`: generated run-local fixtures for supported blocked requests.
Agents can request generated run-local fixtures with a line like:
```text
blocked_request: json fixtures/input.json missing fixture for test
```
Supported fixture types are `png`, `jpg`, `json`, `sqlite`, `text`, and `blob`.
## Integration Runs
`nightshift integ-run` creates a timestamped directory under `integ_runs/` with an isolated virtual environment, initialized template project, logs, transcript, patch, and artifact directories. `integ_runs/` is ignored by git.
Create a local integration sandbox from the NightShift repository root:
```bash
python -m nightshift.cli integ-run --template tutorial-pastebin
```
Then enter the generated project:
```bash
cd integ_runs/<timestamp>/project
```
Activate the sandbox virtual environment and install target dependencies.
PowerShell:
```powershell
..\.venv\Scripts\Activate.ps1
python -m pip install -e ..\..\..
python -m pip install -e . pytest flask
```
Bash:
```bash
source ../.venv/bin/activate
python -m pip install -e ../../..
python -m pip install -e . pytest flask
```
Run NightShift inside the generated `project/` directory:
```bash
python -m nightshift.cli validate
python -m nightshift.cli run --task TASK-001
```
To clean up old sandboxes before creating a new one, keep only the newest three existing runs:
```bash
python -m nightshift.cli integ-run --template tutorial-pastebin --keep 3
```
## Pastebin Tutorial
`nightshift init --template tutorial-pastebin` creates a small Flask snippet-hosting target with deterministic tests and incremental NightShift tasks. Its pipeline includes semantic context retrieval, telemetry, debugger support, and implementation fallback order:
- `qwen2.5-coder:14b`
- `carstenuhlig/omnicoder-9b`
- `deepseek-coder-v2:16b`

View File

@ -868,7 +868,9 @@ NightShift currently provides:
* Ollama-backed local model agents through the local HTTP API
* OpenAI-compatible local/server model agents
* Per-agent temperature settings
* Cost, runtime, retry, and estimated token telemetry summaries
* Scoped repo lookup tools: `list_files`, `read_file`, and `grep`
* Lightweight semantic repository indexing for files, symbols, imports, tests, and compact task context
* Planner lookup requests, `files-inspected.md`, and planner reruns with retrieved context
* Project context chart generation
* Context pack generation
@ -895,9 +897,322 @@ NightShift currently provides:
* Final task notes, stage summaries, task completion artifacts, and run summaries
* Documentation for config, artifact review, troubleshooting, quickstart, and patch workflows
* A complete fake-agent patch-mode quickstart Lisp example under `examples/quickstart-lisp/`
* A deterministic pastebin tutorial template with model fallback configuration
The system remains sequential and local-first. It is designed to produce reviewable artifacts and repository state, not to deploy, push, or autonomously ship changes.
# 16.5 Current Tasks Todo
- [x] TASK-001: Failure classification pipeline
Dependencies:
- None
Description:
Add a deterministic post-failure analysis stage that runs after every failed command or test execution. The classifier should inspect stdout/stderr, exit codes, modified files, and failing tests, then categorize the failure and recommend the next orchestration action.
Acceptance Criteria:
- Captures stdout, stderr, exit code, modified files, and failing test names
- Produces structured output containing:
- failure category
- probable root cause
- confidence
- recommended next action
- retry recommendation
- Supports initial categories:
- syntax/import error
- missing dependency
- missing resource/fixture
- environment/config issue
- API misuse
- test expectation mismatch
- logic bug
- stuck/unclear
- Integrates into orchestration pipeline before retries occur
- Includes tests for classification behavior
- [x] TASK-002: Structured blocked/resource request system
Dependencies:
- TASK-001
Description:
Allow agents to explicitly declare missing resources or environmental requirements instead of endlessly retrying implementation attempts. Add structured "blocked" responses and runtime support for generating common fixtures and test resources.
Acceptance Criteria:
- Supports structured blocked responses such as:
- missing fixture
- missing config
- missing database
- missing asset
- Includes fixture generators for:
- PNG/JPG images
- JSON fixtures
- sqlite databases
- text/blob files
- Runtime can automatically satisfy supported requests
- Generated fixtures are isolated to the active run directory
- Includes tests for fixture generation and blocked flow handling
- [x] TASK-003: Dedicated debugger agent role
Dependencies:
- TASK-001
Description:
Introduce a dedicated debugger agent responsible for diagnosis rather than implementation. The debugger reviews failed attempts and provides concise explanations and recommendations for the implementer.
Acceptance Criteria:
- Debugger receives:
- task description
- current patch
- failure output
- recent attempt history
- Debugger outputs:
- concise diagnosis
- recommended next action
- "do not modify" guidance
- Debugger does not directly modify code initially
- Implementer receives debugger output in retry context
- Includes tests for debugger orchestration behavior
- [x] TASK-004: Stuck detection and escalation policy engine
Dependencies:
- TASK-001
- TASK-003
Description:
Detect retry churn loops and automatically escalate to different models, debugger review, or human intervention when progress stalls.
Acceptance Criteria:
- Tracks:
- repeated failures
- repeated file edits
- unchanged failing tests
- expanding diff size
- oscillating implementations
- Supports configurable retry budgets
- Supports escalation policies such as:
- debugger review
- larger local model
- cloud model
- human review
- Stops infinite retry loops
- Includes tests for churn detection and escalation behavior
- [x] TASK-005: Multi-model orchestration and escalation
Dependencies:
- TASK-004
Description:
Add support for multiple implementation and debugging models with configurable routing, retry budgets, and escalation rules. Provide examples
Acceptance Criteria:
- Supports separate model pools for:
- implementers
- debuggers
- escalation models
- Allows configurable retry budgets per model
- Supports configurable temperatures per role
- Allows fallback ordering between models
- Integrates with escalation policy engine
- Includes tests for model routing and escalation flow
- [x] TASK-006: Dependency management agent
Dependencies:
- TASK-001
Description:
Add a dependency management subsystem capable of detecting missing packages, understanding dependency manifests, and automatically resolving installation issues. Just for python now.
Acceptance Criteria:
- Detects:
- missing imports
- missing packages
- dependency manifest drift
- invalid package references
- Supports:
- pip
- uv
- poetry
- requirements.txt
- pyproject.toml
- Can propose or apply dependency fixes
- Can retry runs after dependency installation
- Includes tests for dependency resolution flows
- [x] TASK-007: Patch governor and diff safety system
Dependencies:
- TASK-004
Description:
Prevent runaway architectural rewrites and unrelated modifications during retry loops by analyzing diffs and rejecting unsafe patches.
Acceptance Criteria:
- Detects:
- unrelated file modifications
- excessive diff growth
- deletion-heavy patches
- architecture drift
- Can reject unsafe patches before commit/application
- Produces actionable rejection feedback for implementers
- Supports configurable thresholds and policies
- Includes tests for diff analysis and patch rejection behavior
- [x] TASK-008: Integration sandbox runner
Dependencies:
- None
Description:
Add a one-command integration environment runner that creates isolated timestamped run directories for NightShift testing and orchestration experiments. This is the equivalent of doing --template with the tutorials
Acceptance Criteria:
- Adds command:
- `nightshift integ-run`
- Creates timestamped run directories under:
- `integ_runs/`
- Automatically:
- creates isolated venv
- installs project dependencies
- initializes clean template/project state
- Adds `integ_runs/` to `.gitignore`
- Persists:
- logs
- transcripts
- patches
- generated artifacts
- Supports cleanup policies for old runs
- Includes tests for sandbox creation and cleanup behavior
- [x] TASK-009: Structured retry memory system
Dependencies:
- TASK-001
- TASK-004
Description:
Persist compact structured summaries of previous attempts to prevent retry amnesia and repeated failed approaches.
Acceptance Criteria:
- Stores:
- attempted fixes
- failure causes
- rejected hypotheses
- successful observations
- Produces compact retry summaries instead of raw log dumps
- Retry summaries are injected into implementer context
- Supports configurable memory compaction
- Includes tests for retry memory summarization behavior
- [x] TASK-010: Environment-aware execution diagnostics
Dependencies:
- TASK-001
- TASK-006
Description:
Improve orchestration awareness of environment-level failures versus implementation-level failures to reduce wasted retries and false debugging paths.
Acceptance Criteria:
- Distinguishes:
- environment failures
- dependency failures
- fixture/resource failures
- implementation logic failures
- Prevents implementation retries when environment is invalid
- Surfaces actionable remediation guidance
- Integrates with failure classifier and dependency manager
- Includes tests for environment diagnostic behavior
- [x] TASK-011: Update tutorials to reflect the previous changes to the templates as needed
Description:
Tutorials should have the newly added features when relevant.,
Acceptance Criteria:
- Tutorials have features
- [x] TASK-012: Stage output should be more organized. Right now run/task/ produces many files and it is difficult to keep track of. Either sub folders for retries, appending for retries, or compacting, whichever makes sense for our use case.
- [x] TASK-013: Cost, token, and runtime telemetry
Dependencies:
- TASK-005
Description:
Track orchestration cost, latency, retry counts, token usage, and success rates across agents and models. Generally telemetry for analyzing model efficiency and usage. Which model fixes bugs fastest?
Acceptance Criteria:
- Tracks token usage per agent and run
- Tracks runtime duration and retry counts
- Records success/failure metrics
- Supports per-model statistics
- Exposes telemetry summaries and reports
- Includes tests for telemetry aggregation
- [x] TASK-014: Repository semantic indexing system
Dependencies:
- None
Description:
Build lightweight semantic indexing over repositories so agents can retrieve relevant files, symbols, tests, and architecture context without loading excessive raw context.
Acceptance Criteria:
- Indexes symbols, files, imports, and tests
- Supports semantic and keyword search
- Returns compact relevant context snippets
- Reduces prompt context size
- Includes tests for retrieval quality
- [x] TASK-015: Pastebin tutorial project template
Dependencies:
- TASK-008
- TASK-005
Description:
Add a new tutorial project template for NightShift based on a small Pastebin/snippet-hosting service. This should work like the existing imageboard tutorial, but be simpler, more deterministic, and easier to use for testing agent orchestration. The template should be creatable with `--template`.
Acceptance Criteria:
- Adds a new template named `pastebin`
- Supports creating the tutorial project with a command such as:
- `nightshift init --template tutorial-pastebin`
- Template includes a small but realistic app with:
- snippet creation
- snippet viewing
- snippet listing
- optional expiration field
- tags or language field
- basic search/filtering
- Includes a test suite with multiple incremental tasks suitable for agent testing
- Avoids complex media/file-upload behavior from the imageboard tutorial
- Uses deterministic fixtures and simple dependencies
- Includes clear task descriptions for the agent to complete
- Includes README instructions explaining the tutorial goals
- Supports model fallback ordering for this template:
- `qwen2.5-coder:14b`
- `carstenuhlig/omnicoder-9b`
- `deepseek-coder-v2:16b`
- If the first model fails or exceeds its retry budget, the next fallback model is attempted
- Records which model handled each attempt
- Includes tests for template creation and model fallback configuration
---
# 17. Current Product Shape

View File

@ -0,0 +1,124 @@
# Tutorial 03: Pastebin With Model Fallback And Telemetry
This tutorial uses the `tutorial-pastebin` template: a small Flask snippet-hosting service designed for deterministic NightShift orchestration tests.
It is intentionally simpler than the imageboard tutorial. There are no uploads, thumbnails, sessions, or moderation queues. The work is ordinary web-app behavior: snippet creation, viewing, listing, filtering, expiration handling, and simple HTML forms.
## What The Template Creates
Run this from a disposable parent directory:
```bash
nightshift init --template tutorial-pastebin --root nightshift-pastebin
cd nightshift-pastebin
```
For an isolated local integration run, use the integration sandbox command from the NightShift repository root:
```bash
python -m nightshift.cli integ-run --template tutorial-pastebin
cd integ_runs/<timestamp>/project
```
Activate the generated virtual environment.
PowerShell:
```powershell
..\.venv\Scripts\Activate.ps1
python -m pip install -e ..\..\..
```
Bash:
```bash
source ../.venv/bin/activate
python -m pip install -e ../../..
```
The template creates:
```text
nightshift.yaml
.nightshift/
agents/
planner.md
implementer.md
debugger.md
reviewer.md
tasks.md
src/
pastebin_app/
templates/
tests/
pyproject.toml
README.md
```
The template includes a working baseline Flask app and deterministic pytest suite. NightShift tasks then extend or verify app behavior in small increments.
## Prerequisites
Install NightShift from this repository:
```bash
python -m pip install -e .
```
Install target dependencies:
```bash
python -m pip install -e . pytest flask
```
Install and start Ollama, then pull the fallback models you want available:
```bash
ollama pull qwen2.5-coder:14b
ollama pull carstenuhlig/omnicoder-9b
ollama pull deepseek-coder-v2:16b
ollama list
```
NightShift uses Ollama's local HTTP API, normally at `http://localhost:11434`.
## Model Fallback
The template's implementation stage uses this fallback order:
1. `qwen2.5-coder:14b`
2. `carstenuhlig/omnicoder-9b`
3. `deepseek-coder-v2:16b`
NightShift records which agent/model handled each stage in `telemetry-summary.md`.
## Task Plan
The template writes the full task list to `.nightshift/tasks.md`. A copy is included here as [tasks.md](tasks.md).
1. Snippet creation and viewing
2. Snippet listing and filtering
3. Expiration handling
4. HTML forms and templates
Run one task first:
```bash
python -m nightshift.cli validate
python -m nightshift.cli run --task TASK-001
```
Then inspect:
```text
.nightshift/runs/<run-id>/devlog.md
.nightshift/runs/<run-id>/telemetry-summary.md
.nightshift/runs/<run-id>/tasks/TASK-001/semantic-context.md
.nightshift/runs/<run-id>/tasks/TASK-001/telemetry-summary.md
.nightshift/runs/<run-id>/tasks/TASK-001/artifact-index.md
.nightshift/runs/<run-id>/tasks/TASK-001/test-output.txt
```
## Pipeline Reference
A copy of the template pipeline is included here as [nightshift.yaml](nightshift.yaml). The canonical runnable template lives under `nightshift/project_templates/tutorial-pastebin/`.

View File

@ -0,0 +1,124 @@
project:
name: pastebin
root: .
task_file: .nightshift/tasks.md
artifact_dir: .nightshift
safety:
require_clean_worktree: false
scoped_paths:
- src
- tests
- templates
- pyproject.toml
- README.md
allowed_commands:
- python -m pytest -q
forbidden_commands:
- rm -rf
- git push
- curl | bash
experiment:
label: pastebin-model-fallback
prompt_variant: qwen-omnicoder-deepseek-v1
agents:
planner:
backend: ollama
model: qwen2.5-coder:14b
temperature: 0.2
system_prompt: .nightshift/agents/planner.md
implementer_qwen:
backend: ollama
model: qwen2.5-coder:14b
temperature: 0.1
system_prompt: .nightshift/agents/implementer.md
implementer_omnicoder:
backend: ollama
model: carstenuhlig/omnicoder-9b
temperature: 0.1
system_prompt: .nightshift/agents/implementer.md
implementer_deepseek:
backend: ollama
model: deepseek-coder-v2:16b
temperature: 0.1
system_prompt: .nightshift/agents/implementer.md
debugger:
backend: ollama
model: qwen2.5-coder:14b
role: debugger
temperature: 0.1
system_prompt: .nightshift/agents/debugger.md
reviewer:
backend: ollama
model: qwen2.5-coder:14b
temperature: 0.1
system_prompt: .nightshift/agents/reviewer.md
pipeline:
max_task_retries: 3
continue_on_task_failure: false
stages:
- id: plan
type: agent
agent: planner
output: plan.md
- id: semantic_context
type: semantic_context
output: semantic-context.md
- id: context
type: repo_context
output: context-pack.md
- id: implement
type: file_writer
agent_pool:
- implementer_qwen
- implementer_omnicoder
- implementer_deepseek
output: proposed.patch
- id: normalize
type: patch_normalizer
output: normalized.patch
- id: validate_patch
type: patch_validator
output: patch-validation.md
max_files: 12
max_lines: 900
max_delete_ratio: 0.70
on_fail: implement
- id: apply_patch
type: patch_apply
mode: apply
output: patch-apply-output.txt
on_fail: implement
- id: test
type: command
commands:
- python -m pytest -q
output: test-output.txt
shell: true
timeout_seconds: 25
on_fail: implement
- id: review
type: agent_review
agent: reviewer
output: review.md
on_fail: implement
- id: summarize
type: summarize
output: final-notes.md

View File

@ -0,0 +1,57 @@
# Pastebin Tutorial Tasks
- [ ] TASK-001: Snippet creation and viewing
Description:
Complete the pastebin service foundation. Support creating snippets with title, body, optional language, optional tags, and optional expiration date. Support viewing a single snippet by id.
Acceptance Criteria:
- POST `/snippets` creates a snippet with title and body
- GET `/snippets/<id>` returns the snippet
- Optional language, tags, and expires_at fields are persisted
- Tests cover creation and viewing
- [ ] TASK-002: Snippet listing and filtering
Dependencies:
- TASK-001
Description:
Add snippet listing with newest-first ordering and deterministic search/filter behavior.
Acceptance Criteria:
- GET `/snippets` lists snippets newest first
- `q` filters by title or body text
- `language` filters by language
- `tag` filters by tag
- Tests cover listing, search, and filters
- [ ] TASK-003: Expiration handling
Dependencies:
- TASK-002
Description:
Hide expired snippets from list/search results while keeping direct lookup behavior explicit.
Acceptance Criteria:
- Expired snippets are excluded from GET `/snippets`
- Direct lookup of an expired snippet returns 410
- Non-expiring snippets remain visible
- Tests cover expired and active snippets
- [ ] TASK-004: HTML forms and templates
Dependencies:
- TASK-003
Description:
Add simple HTML pages for creating, listing, filtering, and viewing snippets.
Acceptance Criteria:
- GET `/` shows the snippet list
- GET `/new` shows a creation form
- Creating a snippet redirects to the snippet view
- Templates expose language, tags, and expiration fields
- Tests cover HTML response status and redirects

View File

@ -9,6 +9,7 @@ import sys
from .config import validate_config
from .errors import NightShiftError
from .init import available_templates, init_project
from .integ import create_integration_run
from .pipeline import PipelineRunner
from .runlog import RunLogger
from .status import build_status, format_status
@ -55,6 +56,16 @@ def build_parser() -> argparse.ArgumentParser:
web_parser.add_argument("--host", default="127.0.0.1", help="Host to bind.")
web_parser.add_argument("--port", type=int, default=8765, help="Port to bind.")
integ_parser = subparsers.add_parser("integ-run", help="Create an isolated integration run directory.")
integ_parser.add_argument("--root", default=".", help="Repository root where integ_runs/ is created.")
integ_parser.add_argument(
"--template",
default="basic",
choices=available_templates(),
help="Template to initialize inside the sandbox.",
)
integ_parser.add_argument("--keep", type=int, help="Keep only the newest N old integration runs before creating a new one.")
return parser
@ -122,6 +133,13 @@ def main(argv: list[str] | None = None) -> int:
app.run(host=args.host, port=args.port)
return 0
if args.command == "integ-run":
run = create_integration_run(Path(args.root), template=args.template, keep=args.keep)
print(f"Integration run: {run.directory}")
print(f"Venv: {run.venv_dir}")
print(f"Log: {run.log_path}")
return 0
except NightShiftError as exc:
print(str(exc), file=sys.stderr)
return 1

View File

@ -53,6 +53,7 @@ class StageConfig:
id: str
type: str
agent: str | None = None
agent_pool: tuple[str, ...] = ()
commands: tuple[str, ...] = ()
output: str | None = None
on_fail: str | None = None
@ -61,6 +62,7 @@ class StageConfig:
working_dir: Path | None = None
max_files: int | None = None
max_lines: int | None = None
max_delete_ratio: float | None = None
forbidden_paths: tuple[str, ...] = ()
mode: str | None = None
@ -97,6 +99,7 @@ SUPPORTED_STAGE_TYPES = AGENT_STAGE_TYPES | COMMAND_STAGE_TYPES | {
"patch_apply",
"patch_validator",
"repo_context",
"semantic_context",
"summarize",
}
@ -286,6 +289,7 @@ def parse_config(raw: dict[str, Any], config_path: Path) -> NightShiftConfig:
)
agent = _optional_string(stage_raw.get("agent"), f"{stage_context}.agent")
agent_pool = _string_tuple(stage_raw.get("agent_pool", []), f"{stage_context}.agent_pool")
commands = _string_tuple(stage_raw.get("commands", []), f"{stage_context}.commands")
timeout_seconds = _optional_int_or_none(
stage_raw.get("timeout_seconds"),
@ -296,33 +300,48 @@ def parse_config(raw: dict[str, Any], config_path: Path) -> NightShiftConfig:
working_dir_raw = _optional_string(stage_raw.get("working_dir"), f"{stage_context}.working_dir")
max_files = _optional_int_or_none(stage_raw.get("max_files"), f"{stage_context}.max_files")
max_lines = _optional_int_or_none(stage_raw.get("max_lines"), f"{stage_context}.max_lines")
max_delete_ratio = _optional_float_or_none(
stage_raw.get("max_delete_ratio"),
f"{stage_context}.max_delete_ratio",
)
if max_files is not None and max_files <= 0:
raise ConfigError(f"Config error: {stage_context}.max_files must be greater than zero.")
if max_lines is not None and max_lines <= 0:
raise ConfigError(f"Config error: {stage_context}.max_lines must be greater than zero.")
if max_delete_ratio is not None and not 0 <= max_delete_ratio <= 1:
raise ConfigError(f"Config error: {stage_context}.max_delete_ratio must be between 0 and 1.")
mode = _optional_string(stage_raw.get("mode"), f"{stage_context}.mode")
if stage_type == "patch_apply" and mode not in {None, "dry_run", "apply"}:
raise ConfigError(
f"Config error: {stage_context}.mode must be 'dry_run' or 'apply'."
)
effective_agent = agent or (agent_pool[0] if agent_pool else None)
if stage_type in AGENT_STAGE_TYPES:
if agent is None:
if effective_agent is None:
raise ConfigError(f"Config error: agent stage '{stage_id}' must reference an agent.")
if agent not in agents:
if effective_agent not in agents:
defined = ", ".join(sorted(agents))
raise ConfigError(
f"Config error: pipeline stage '{stage_id}' references unknown agent "
f"'{agent}'. Defined agents: {defined}."
f"'{effective_agent}'. Defined agents: {defined}."
)
if stage_type in {"code_writer", "file_writer"}:
if agent is None:
if effective_agent is None:
raise ConfigError(f"Config error: {stage_type} stage '{stage_id}' must reference an agent.")
if agent not in agents:
if effective_agent not in agents:
defined = ", ".join(sorted(agents))
raise ConfigError(
f"Config error: pipeline stage '{stage_id}' references unknown agent "
f"'{agent}'. Defined agents: {defined}."
f"'{effective_agent}'. Defined agents: {defined}."
)
for pooled_agent in agent_pool:
if pooled_agent not in agents:
defined = ", ".join(sorted(agents))
raise ConfigError(
f"Config error: pipeline stage '{stage_id}' references unknown pooled agent "
f"'{pooled_agent}'. Defined agents: {defined}."
)
if stage_type == "patch_normalizer" and agent is not None and agent not in agents:
defined = ", ".join(sorted(agents))
@ -342,7 +361,8 @@ def parse_config(raw: dict[str, Any], config_path: Path) -> NightShiftConfig:
StageConfig(
id=stage_id,
type=stage_type,
agent=agent,
agent=effective_agent,
agent_pool=agent_pool,
commands=commands,
output=_optional_string(stage_raw.get("output"), f"{stage_context}.output"),
on_fail=_optional_string(stage_raw.get("on_fail"), f"{stage_context}.on_fail"),
@ -351,6 +371,7 @@ def parse_config(raw: dict[str, Any], config_path: Path) -> NightShiftConfig:
working_dir=Path(working_dir_raw) if working_dir_raw else None,
max_files=max_files,
max_lines=max_lines,
max_delete_ratio=max_delete_ratio,
forbidden_paths=_string_tuple(
stage_raw.get("forbidden_paths", []),
f"{stage_context}.forbidden_paths",

View File

@ -0,0 +1,57 @@
"""Python dependency diagnostics."""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
import re
@dataclass(frozen=True)
class DependencyDiagnostic:
missing_imports: tuple[str, ...]
manifests: tuple[str, ...]
recommendation: str
def diagnose_python_dependencies(project_root: Path, failure_output: str) -> DependencyDiagnostic:
imports = []
for match in re.finditer(r"No module named ['\"]([^'\"]+)['\"]", failure_output):
name = match.group(1).split(".")[0]
if name not in imports:
imports.append(name)
manifests = tuple(
relative
for relative in ("pyproject.toml", "requirements.txt", "poetry.lock", "uv.lock")
if (project_root / relative).exists()
)
if not imports:
recommendation = "No missing Python import was detected."
elif "pyproject.toml" in manifests:
recommendation = "Add the missing package to pyproject.toml, then install with the configured tool."
elif "requirements.txt" in manifests:
recommendation = "Add the missing package to requirements.txt, then run pip install -r requirements.txt."
else:
recommendation = "Create a Python dependency manifest or install the missing package in the active environment."
return DependencyDiagnostic(tuple(imports), manifests, recommendation)
def format_dependency_diagnostic(diagnostic: DependencyDiagnostic) -> str:
imports = "\n".join(f"- `{name}`" for name in diagnostic.missing_imports) or "- None"
manifests = "\n".join(f"- `{name}`" for name in diagnostic.manifests) or "- None"
return "\n".join(
[
"# Dependency Diagnostic",
"",
"## Missing Imports",
"",
imports,
"",
"## Manifests",
"",
manifests,
"",
f"Recommendation: {diagnostic.recommendation}",
"",
]
)

40
nightshift/escalation.py Normal file
View File

@ -0,0 +1,40 @@
"""Retry churn detection and escalation policy helpers."""
from __future__ import annotations
from dataclasses import dataclass
from .retry_memory import RetryMemoryEntry
@dataclass(frozen=True)
class EscalationDecision:
should_stop: bool
action: str
reason: str
def evaluate_retry_churn(entries: tuple[RetryMemoryEntry, ...], *, retry_budget: int) -> EscalationDecision:
if len(entries) < 2:
return EscalationDecision(False, "continue", "Not enough retry history for churn detection.")
recent = entries[-3:]
same_stage = len({entry.stage_id for entry in recent}) == 1
same_cause = len({entry.cause for entry in recent}) == 1
if len(entries) >= retry_budget and retry_budget > 0:
return EscalationDecision(True, "human review", "Configured retry budget is exhausted.")
if len(recent) == 3 and same_stage and same_cause:
return EscalationDecision(True, "debugger review or larger model", "The same stage is failing with the same reason repeatedly.")
return EscalationDecision(False, "continue", "No retry churn detected.")
def format_escalation_decision(decision: EscalationDecision) -> str:
return "\n".join(
[
"# Escalation Policy",
"",
f"Action: {decision.action}",
f"Stop retries: {str(decision.should_stop).lower()}",
f"Reason: {decision.reason}",
"",
]
)

154
nightshift/failures.py Normal file
View File

@ -0,0 +1,154 @@
"""Deterministic failure classification helpers."""
from __future__ import annotations
from dataclasses import dataclass
import re
FAILURE_CATEGORIES = (
"syntax/import error",
"missing dependency",
"missing resource/fixture",
"environment/config issue",
"API misuse",
"test expectation mismatch",
"logic bug",
"stuck/unclear",
)
@dataclass(frozen=True)
class FailureClassification:
category: str
probable_root_cause: str
confidence: float
recommended_next_action: str
retry_recommendation: str
failing_tests: tuple[str, ...] = ()
def classify_failure(output: str, exit_code: int | None = None, modified_files: tuple[str, ...] = ()) -> FailureClassification:
"""Classify command/test output with deterministic rules."""
text = output or ""
lowered = text.lower()
failing_tests = extract_failing_tests(text)
if re.search(r"\b(syntaxerror|indentationerror|importerror)\b", text, re.IGNORECASE):
return FailureClassification(
"syntax/import error",
"Python failed while parsing or importing code.",
0.86,
"Send the failure excerpt and touched files back to the implementer.",
"retry implementation",
failing_tests,
)
missing = re.search(r"No module named ['\"]([^'\"]+)['\"]", text, re.IGNORECASE)
if not missing:
missing = re.search(r"ModuleNotFoundError:\s*['\"]?([A-Za-z0-9_.-]+)", text, re.IGNORECASE)
if missing:
package = missing.group(1) or "unknown package"
return FailureClassification(
"missing dependency",
f"Runtime cannot import required package `{package}`.",
0.91,
"Run dependency diagnostics before another implementation retry.",
"do not retry implementation until dependency is resolved",
failing_tests,
)
if any(marker in lowered for marker in ("filenotfounderror", "no such file or directory", "missing fixture", "fixture")):
return FailureClassification(
"missing resource/fixture",
"The run appears to depend on a fixture or resource that is not present.",
0.78,
"Generate or request the missing fixture, then rerun validation.",
"retry after resource remediation",
failing_tests,
)
if any(marker in lowered for marker in ("permission denied", "environment variable", "config error", "not configured", "connection refused")):
return FailureClassification(
"environment/config issue",
"The execution environment or configuration is invalid.",
0.76,
"Surface remediation guidance and stop implementation retries.",
"do not retry implementation",
failing_tests,
)
if any(marker in lowered for marker in ("typeerror", "attributeerror", "unexpected keyword", "has no attribute")):
return FailureClassification(
"API misuse",
"The implementation is calling an API with an incompatible shape.",
0.72,
"Retry implementation with the exception and relevant call site.",
"retry implementation",
failing_tests,
)
if any(marker in lowered for marker in ("assertionerror", "assert ", "expected", " != ", " == ")) or failing_tests:
return FailureClassification(
"test expectation mismatch",
"Tests ran and reported mismatched expected behavior.",
0.7,
"Retry implementation with the failing test names and assertion excerpt.",
"retry implementation",
failing_tests,
)
if exit_code not in (None, 0):
category = "logic bug" if modified_files else "stuck/unclear"
return FailureClassification(
category,
"The command failed without a more specific deterministic signature.",
0.45,
"Use debugger review or compact failure output before retrying.",
"retry with debugger guidance",
failing_tests,
)
return FailureClassification(
"stuck/unclear",
"No failure signature was found.",
0.2,
"Inspect the full stage artifact.",
"manual review",
failing_tests,
)
def extract_failing_tests(output: str) -> tuple[str, ...]:
tests: list[str] = []
patterns = (
r"FAILED\s+([^\s]+::[^\s]+)",
r"ERROR\s+([^\s]+::[^\s]+)",
r"def\s+(test_[A-Za-z0-9_]+)\(",
)
for pattern in patterns:
for match in re.finditer(pattern, output):
name = match.group(1).strip()
if name not in tests:
tests.append(name)
return tuple(tests)
def format_failure_classification(result: FailureClassification, *, exit_code: int | None, modified_files: tuple[str, ...]) -> str:
files = "\n".join(f"- `{path}`" for path in modified_files) or "- None"
tests = "\n".join(f"- `{name}`" for name in result.failing_tests) or "- None"
return "\n".join(
[
"# Failure Analysis",
"",
f"Failure category: {result.category}",
f"Probable root cause: {result.probable_root_cause}",
f"Confidence: {result.confidence:.2f}",
f"Recommended next action: {result.recommended_next_action}",
f"Retry recommendation: {result.retry_recommendation}",
f"Exit code: {exit_code if exit_code is not None else ''}",
"",
"## Modified Files",
"",
files,
"",
"## Failing Tests",
"",
tests,
"",
]
)

View File

@ -15,6 +15,7 @@ STARTER_FILES = {
"agents/planner.md": templates.PLANNER_PROMPT,
"agents/implementer.md": templates.IMPLEMENTER_PROMPT,
"agents/reviewer.md": templates.REVIEWER_PROMPT,
"agents/debugger.md": templates.DEBUGGER_PROMPT,
}
IMAGEBOARD_FILES = {
@ -23,6 +24,7 @@ IMAGEBOARD_FILES = {
".nightshift/agents/planner.md": templates.REAL_MODEL_PLANNER_PROMPT,
".nightshift/agents/implementer.md": templates.REAL_MODEL_IMPLEMENTER_PROMPT,
".nightshift/agents/reviewer.md": templates.REAL_MODEL_REVIEWER_PROMPT,
".nightshift/agents/debugger.md": templates.REAL_MODEL_DEBUGGER_PROMPT,
"README.md": templates.IMAGEBOARD_README,
"src/imageboard/.gitkeep": "",
"tests/.gitkeep": "",

66
nightshift/integ.py Normal file
View File

@ -0,0 +1,66 @@
"""Integration sandbox runner."""
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
import shutil
import venv
from .init import init_project
@dataclass(frozen=True)
class IntegrationRun:
directory: Path
venv_dir: Path
log_path: Path
def create_integration_run(root: Path, *, template: str = "basic", keep: int | None = None) -> IntegrationRun:
base = root.resolve() / "integ_runs"
base.mkdir(parents=True, exist_ok=True)
if keep is not None:
cleanup_integration_runs(base, keep=keep)
run_id = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S.%fZ")
run_dir = base / run_id
run_dir.mkdir()
log_dir = run_dir / "logs"
transcript_dir = run_dir / "transcripts"
patch_dir = run_dir / "patches"
artifact_dir = run_dir / "artifacts"
for directory in (log_dir, transcript_dir, patch_dir, artifact_dir):
directory.mkdir()
venv_dir = run_dir / ".venv"
venv.EnvBuilder(with_pip=True).create(venv_dir)
project_dir = run_dir / "project"
project_dir.mkdir()
init_project(project_dir, template=template)
log_path = log_dir / "integ-run.log"
log_path.write_text(
"\n".join(
[
"# Integration Run",
"",
f"Template: {template}",
f"Project: {project_dir}",
f"Venv: {venv_dir}",
"Dependencies: project installation is left to the operator or command stages.",
"",
]
),
encoding="utf-8",
)
return IntegrationRun(run_dir, venv_dir, log_path)
def cleanup_integration_runs(base: Path, *, keep: int) -> tuple[Path, ...]:
if keep < 0:
raise ValueError("keep must be zero or greater")
runs = sorted((path for path in base.iterdir() if path.is_dir()), key=lambda path: path.name, reverse=True)
removed: list[Path] = []
for path in runs[keep:]:
shutil.rmtree(path)
removed.append(path)
return tuple(removed)

View File

@ -142,6 +142,7 @@ def validate_patch(
safety: SafetyConfig,
max_files: int = DEFAULT_MAX_FILES,
max_changed_lines: int = DEFAULT_MAX_CHANGED_LINES,
max_delete_ratio: float | None = None,
forbidden_paths: tuple[str, ...] = DEFAULT_FORBIDDEN_PATHS,
) -> PatchValidationResult:
root = resolve_project_root(project_root)
@ -153,12 +154,18 @@ def validate_patch(
raise PipelineError(f"Patch validation failed: touches {len(files)} files, max is {max_files}.")
changed_lines = _changed_line_count(patch)
deleted_lines = _deleted_line_count(patch)
if changed_lines <= 0:
raise PipelineError("Patch validation failed: patch has no changed lines.")
if changed_lines > max_changed_lines:
raise PipelineError(
f"Patch validation failed: changes {changed_lines} lines, max is {max_changed_lines}."
)
if max_delete_ratio is not None and changed_lines > 0 and deleted_lines / changed_lines > max_delete_ratio:
raise PipelineError(
"Patch validation failed: deletion-heavy patch exceeds "
f"max_delete_ratio {max_delete_ratio:.2f}."
)
for path_text in files:
_validate_patch_path(path_text, root, scoped_roots, forbidden_paths)
@ -427,6 +434,21 @@ def _changed_line_count(patch: str) -> int:
return count
def _deleted_line_count(patch: str) -> int:
count = 0
in_hunk = False
for line in patch.splitlines():
if line.startswith("diff --git "):
in_hunk = False
continue
if line.startswith("@@"):
in_hunk = True
continue
if in_hunk and line.startswith("-") and not line.startswith("---"):
count += 1
return count
def _validate_patch_path(
path_text: str,
root: Path,

View File

@ -5,14 +5,18 @@ from __future__ import annotations
from dataclasses import dataclass, replace
from pathlib import Path
import re
import subprocess
from .agents import AgentExecutor
from .artifacts import ArtifactStore
from .commands import CommandExecutor
from .config import COMMAND_STAGE_TYPES, NightShiftConfig, StageConfig
from .context import ContextManager
from .dependencies import diagnose_python_dependencies, format_dependency_diagnostic
from .escalation import evaluate_retry_churn, format_escalation_decision
from .errors import PipelineError
from .errors import NightShiftError
from .failures import classify_failure, format_failure_classification
from .git import ensure_clean_worktree, write_diff_artifact, write_git_artifacts
from .patches import (
DEFAULT_FORBIDDEN_PATHS,
@ -30,9 +34,18 @@ from .patches import (
from .project_chart import build_project_context_chart
from .reports import ReportGenerator
from .repo_tools import RepoTools, extract_agent_stdout, parse_lookup_requests
from .resources import format_resource_report, parse_resource_requests, satisfy_resource_requests
from .retry_memory import RetryMemoryEntry, entry_from_stage, summarize_retry_memory
from .semantic_index import (
build_semantic_index,
format_search_results,
format_semantic_index,
search_index,
)
from .runlog import RunLogger
from .stages import StageResult
from .tasks import Task, mark_task_completed
from .telemetry import TelemetryEntry, format_telemetry_summary, telemetry_from_stage_output
@dataclass(frozen=True)
@ -126,6 +139,8 @@ class PipelineRunner:
stage_results: list[StageResult] = []
previous_outputs: dict[str, str] = {}
retry_notes: list[str] = []
retry_memory: list[RetryMemoryEntry] = []
telemetry_entries: list[TelemetryEntry] = []
retry_count = 0
index = 0
final_status = "complete"
@ -160,6 +175,8 @@ class PipelineRunner:
if stage.id in previous_outputs:
del previous_outputs[stage.id]
previous_outputs[stage.id] = self._read_output(result.output_path)
telemetry_entries.append(self._telemetry_entry(stage, result, retry_count))
self._write_telemetry(task.id, telemetry_entries)
self.logger.event(
"stage.finish",
"Finished stage",
@ -195,6 +212,12 @@ class PipelineRunner:
continue
target_stage = stage.on_fail or result.next_stage
analysis_note = self._write_failure_diagnostics(stage, task, result, retry_count)
if analysis_note:
retry_notes.append(analysis_note)
debugger_note = self._run_debugger_if_configured(task, result, retry_notes)
if debugger_note:
retry_notes.append(debugger_note)
if target_stage:
if retry_count >= self.config.pipeline.max_task_retries:
final_status = "failed"
@ -209,6 +232,26 @@ class PipelineRunner:
)
break
retry_count += 1
memory_entry = entry_from_stage(retry_count, result, target_stage)
retry_memory.append(memory_entry)
self.artifacts.write_stage_output(
task.id,
"retry-memory.md",
summarize_retry_memory(tuple(retry_memory)),
)
decision = evaluate_retry_churn(
tuple(retry_memory),
retry_budget=self.config.pipeline.max_task_retries + 1,
)
self.artifacts.write_stage_output(
task.id,
"escalation-policy.md",
format_escalation_decision(decision),
)
if decision.should_stop:
final_status = "failed"
final_reason = f"Escalation policy stopped retries: {decision.reason}"
break
self.logger.event(
"stage.retry",
"Redirecting after stage result",
@ -260,6 +303,7 @@ class PipelineRunner:
stage_results,
context_out_path=context_out_path,
)
self._write_telemetry(task.id, telemetry_entries)
self.logger.event(
"task.finish",
"Finished task",
@ -361,7 +405,7 @@ class PipelineRunner:
if stage.type in {"agent", "agent_review", "review"}:
context = self.context.read_context(task, retry_notes)
result = self.agent_executor.run_stage(
stage,
self._stage_for_retry_agent(stage, retry_count),
task,
previous_outputs,
retry_notes,
@ -370,6 +414,9 @@ class PipelineRunner:
retry_context=context.retry_context,
)
if stage.type == "agent":
resource_result = self._maybe_satisfy_resource_request(stage, task, result)
if resource_result is not None:
return resource_result
return self._maybe_rerun_agent_with_repo_lookup(
stage,
task,
@ -412,6 +459,33 @@ class PipelineRunner:
reason="Context pack written.",
output_path=str(output_path.relative_to(self.config.project.root)),
)
if stage.type == "semantic_context":
index = build_semantic_index(self.config.project.root, self.config.safety)
index_path = self.artifacts.write_stage_output(
task.id,
"semantic-index.md",
format_semantic_index(index),
)
query = " ".join([task.title, task.description, *task.acceptance_criteria])
context_path = self.artifacts.write_stage_output(
task.id,
stage.output or "semantic-context.md",
format_search_results(search_index(index, query, limit=8), query),
)
self.logger.event(
"artifact.write",
"Wrote semantic context",
stage_id=stage.id,
task_id=task.id,
artifact_path=context_path.relative_to(self.config.project.root),
)
return StageResult(
stage_id=stage.id,
status="pass",
reason="Semantic context written.",
output_path=str(context_path.relative_to(self.config.project.root)),
context_update=f"Semantic index: {index_path.relative_to(self.config.project.root).as_posix()}",
)
if stage.type == "summarize":
output_path = self.artifacts.write_stage_output(
task.id,
@ -703,7 +777,10 @@ class PipelineRunner:
def _writer_agent_stage(self, stage: StageConfig, retry_count: int) -> StageConfig:
suffix = f"-{retry_count}" if retry_count else ""
return replace(stage, output=f"{stage.id}-agent-output{suffix}.md")
return replace(
self._stage_for_retry_agent(stage, retry_count),
output=f"{stage.id}-agent-output{suffix}.md",
)
def _stage_after_patch_flow(self, current_stage_id: str) -> str | None:
stages = list(self.config.pipeline.stages)
@ -791,6 +868,7 @@ class PipelineRunner:
self.config.safety,
max_files=stage.max_files or DEFAULT_MAX_FILES,
max_changed_lines=stage.max_lines or DEFAULT_MAX_CHANGED_LINES,
max_delete_ratio=stage.max_delete_ratio,
forbidden_paths=stage.forbidden_paths or DEFAULT_FORBIDDEN_PATHS,
)
except PipelineError as exc:
@ -834,6 +912,7 @@ class PipelineRunner:
self.config.safety,
max_files=stage.max_files or DEFAULT_MAX_FILES,
max_changed_lines=stage.max_lines or DEFAULT_MAX_CHANGED_LINES,
max_delete_ratio=stage.max_delete_ratio,
forbidden_paths=stage.forbidden_paths or DEFAULT_FORBIDDEN_PATHS,
)
except PipelineError as exc:
@ -886,6 +965,163 @@ class PipelineRunner:
path = self.artifacts.create_task_dir(task_id).directory / filename
return path if path.exists() else None
def _stage_for_retry_agent(self, stage: StageConfig, retry_count: int) -> StageConfig:
if not stage.agent_pool:
return stage
index = min(retry_count, len(stage.agent_pool) - 1)
return replace(stage, agent=stage.agent_pool[index])
def _maybe_satisfy_resource_request(
self,
stage: StageConfig,
task: Task,
result: StageResult,
) -> StageResult | None:
output_text = self._read_output(result.output_path)
requests = parse_resource_requests(extract_agent_stdout(output_text))
if not requests:
return None
paths = satisfy_resource_requests(self.artifacts, task.id, requests)
report_path = self.artifacts.write_stage_output(
task.id,
"resource-requests.md",
format_resource_report(requests, paths, self.config.project.root),
)
return StageResult(
stage.id,
"pass",
"Blocked resource requests were satisfied in the active run directory.",
output_path=str(report_path.relative_to(self.config.project.root)),
context_update=(
"Generated run-local resources: "
+ ", ".join(path.relative_to(self.config.project.root).as_posix() for path in paths)
),
)
def _write_failure_diagnostics(
self,
stage: StageConfig,
task: Task,
result: StageResult,
retry_count: int,
) -> str:
output = self._read_output(result.output_path)
if not output and not result.reason:
return ""
exit_code = _extract_exit_code(output) or _extract_exit_code(result.reason)
modified_files = self._modified_files()
classification = classify_failure(
"\n".join([result.reason, output]),
exit_code=exit_code,
modified_files=modified_files,
)
filename = f"diagnostics/{stage.id}-failure"
if retry_count:
filename += f"-retry-{retry_count}"
filename += ".md"
diagnostic_path = self.artifacts.write_stage_output(
task.id,
filename,
format_failure_classification(
classification,
exit_code=exit_code,
modified_files=modified_files,
),
)
if classification.category == "missing dependency":
dependency_path = self.artifacts.write_stage_output(
task.id,
"diagnostics/dependency-diagnostic.md",
format_dependency_diagnostic(
diagnose_python_dependencies(self.config.project.root, output)
),
)
return (
f"Failure classification: {classification.category}; "
f"diagnostic: {diagnostic_path.relative_to(self.config.project.root).as_posix()}; "
f"dependency diagnostic: {dependency_path.relative_to(self.config.project.root).as_posix()}."
)
return (
f"Failure classification: {classification.category}; "
f"root cause: {classification.probable_root_cause}; "
f"diagnostic: {diagnostic_path.relative_to(self.config.project.root).as_posix()}."
)
def _run_debugger_if_configured(
self,
task: Task,
result: StageResult,
retry_notes: list[str],
) -> str:
debugger_id = next(
(
agent_id
for agent_id, agent in self.config.agents.items()
if agent.role == "debugger" or agent_id == "debugger"
),
None,
)
if debugger_id is None:
return ""
stage = StageConfig(
id="debugger",
type="agent",
agent=debugger_id,
output="debugger.md",
)
output = self._read_output(result.output_path)
context = self.context.read_context(task, retry_notes)
debug_result = self.agent_executor.run_stage(
stage,
task,
{"failed_stage": result.reason, "failure_output": output},
retry_notes,
project_context=context.project_context,
task_context=context.task_context,
retry_context=context.retry_context,
)
return f"Debugger output: {debug_result.output_path or 'none'}."
def _modified_files(self) -> tuple[str, ...]:
completed = subprocess.run(
["git", "status", "--short"],
cwd=self.config.project.root,
capture_output=True,
text=True,
encoding="utf-8",
errors="replace",
)
if completed.returncode != 0:
return ()
files: list[str] = []
for line in completed.stdout.splitlines():
if len(line) > 3:
files.append(line[3:].strip())
return tuple(files)
def _telemetry_entry(
self,
stage: StageConfig,
result: StageResult,
retry_count: int,
) -> TelemetryEntry:
effective_stage = self._stage_for_retry_agent(stage, retry_count)
agent = self.config.agents.get(effective_stage.agent) if effective_stage.agent else None
return telemetry_from_stage_output(
stage_id=result.stage_id,
stage_type=stage.type,
status=result.status,
output=self._read_output(result.output_path),
retry_count=retry_count,
agent_id=agent.id if agent else None,
model=agent.model if agent else None,
)
def _write_telemetry(self, task_id: str, entries: list[TelemetryEntry]) -> None:
summary = format_telemetry_summary(tuple(entries))
self.artifacts.write_stage_output(task_id, "telemetry-summary.md", summary)
self.artifacts.run_dir.joinpath("telemetry-summary.md").write_text(summary, encoding="utf-8")
def _maybe_rerun_agent_with_repo_lookup(
self,
stage: StageConfig,
@ -1128,6 +1364,17 @@ def _attempt_filename(filename: str, retry_count: int) -> str:
return path.with_name(name).as_posix()
def _extract_exit_code(text: str) -> int | None:
match = re.search(r"Exit code:\s*(-?\d+)|code\s+(-?\d+)", text)
if not match:
return None
value = match.group(1) or match.group(2)
try:
return int(value)
except (TypeError, ValueError):
return None
def format_aggregate_run_summary(results: list[PipelineResult], status: str, reason: str) -> str:
lines = [
"# Run Summary",

View File

@ -0,0 +1,10 @@
# Debugger
You diagnose failed attempts for NightShift.
Output:
- concise diagnosis
- recommended next action
- do not modify guidance
Do not directly modify files.

View File

@ -31,6 +31,12 @@ agents:
command: echo
system_prompt: agents/reviewer.md
debugger:
backend: command
command: echo
role: debugger
system_prompt: agents/debugger.md
pipeline:
max_task_retries: 3
stages:
@ -47,7 +53,8 @@ pipeline:
- id: implement
type: agent
agent: implementer
agent_pool:
- implementer
output: implementation-log.md
- id: test

View File

@ -54,6 +54,13 @@ agents:
temperature: 0.1
system_prompt: .nightshift/agents/reviewer.md
debugger:
backend: ollama
model: qwen2.5-coder:14b
role: debugger
temperature: 0.1
system_prompt: .nightshift/agents/debugger.md
pipeline:
max_task_retries: 4
continue_on_task_failure: false
@ -74,7 +81,9 @@ pipeline:
- id: implement_junior
type: file_writer
agent: junior
agent_pool:
- junior
- senior
output: proposed.patch
- id: normalize_junior
@ -86,6 +95,7 @@ pipeline:
output: patch-validation.md
max_files: 16
max_lines: 1400
max_delete_ratio: 0.70
on_fail: implement_senior
- id: apply_junior
@ -123,6 +133,7 @@ pipeline:
output: senior-patch-validation.md
max_files: 20
max_lines: 1800
max_delete_ratio: 0.70
on_fail: implement_senior
- id: apply_senior

View File

@ -41,6 +41,13 @@ agents:
temperature: 0.1
system_prompt: .nightshift/agents/reviewer.md
debugger:
backend: ollama
model: qwen2.5-coder:14b
role: debugger
temperature: 0.1
system_prompt: .nightshift/agents/debugger.md
pipeline:
max_task_retries: 2
continue_on_task_failure: false
@ -56,7 +63,8 @@ pipeline:
- id: implement
type: file_writer
agent: implementer
agent_pool:
- implementer
output: proposed.patch
- id: normalize
@ -68,6 +76,7 @@ pipeline:
output: patch-validation.md
max_files: 8
max_lines: 700
max_delete_ratio: 0.70
on_fail: implement
- id: apply_patch

View File

@ -18,7 +18,7 @@ Dependencies:
- TASK-001
Description:
Add image attachment support for new threads and replies. Store uploaded image metadata in SQLite, save uploaded files under `static/uploads`, and generate thumbnails under `static/thumbs`.
Add image attachment support for new threads and replies. Store uploaded image metadata in SQLite, save uploaded files under `static/uploads`, and generate thumbnails under `static/thumbs`. There is a test image in tests/test_images/test.png
Acceptance Criteria:
- Accepts image uploads for threads and replies

View File

@ -44,6 +44,13 @@ agents:
temperature: 0.1
system_prompt: .nightshift/agents/reviewer.md
debugger:
backend: ollama
model: qwen2.5-coder:14b
role: debugger
temperature: 0.1
system_prompt: .nightshift/agents/debugger.md
pipeline:
max_task_retries: 3
continue_on_task_failure: false
@ -59,7 +66,8 @@ pipeline:
- id: implement
type: file_writer
agent: implementer
agent_pool:
- implementer
output: proposed.patch
- id: normalize
@ -71,6 +79,7 @@ pipeline:
output: patch-validation.md
max_files: 10
max_lines: 900
max_delete_ratio: 0.70
on_fail: implement
- id: apply_patch

Binary file not shown.

After

Width:  |  Height:  |  Size: 8.4 MiB

View File

@ -0,0 +1,11 @@
You are the implementation agent for NightShift.
Output only complete file content blocks.
Use one fenced block per file with this exact opening form:
```file:relative/path.py
<complete file content>
```
Do not include explanations before or after the file blocks.
Include tests when needed.
Keep the change as small as possible.
Only edit files needed for the task.

View File

@ -0,0 +1,19 @@
You are the planning agent for NightShift.
Create a concise implementation plan for the current task.
If you need repository context before planning, output lookup requests exactly like this:
lookup_requests:
- tool: read_file
path: relative/path.py
- tool: grep
path: .
pattern: search_regex
After context is provided, write a short plan with:
- files to edit
- tests to add or update
- risks
Do not write code.

View File

@ -0,0 +1,14 @@
You are the review agent for NightShift.
Review the task, plan, patch artifacts, test output, and final state.
Output exactly:
status: pass | fail | retry | escalate
reason: <short explanation>
next_stage: <optional stage id>
context_update: <compact useful note>
Use retry when the implementation is close but needs another patch.
Use fail when the patch is unsafe, unrelated, or clearly broken.
Use pass only when the acceptance criteria are satisfied.

View File

@ -0,0 +1,76 @@
# Tasks
- [ ] TASK-001: REPL foundation
Description:
Create the initial Lisp REPL application. Implement the command-line entry point, read-eval-print loop, basic input handling, graceful exit commands, and tests. Keep source code under `src/`, tests under `tests/`.
Acceptance Criteria:
- Provides a CLI entry point for starting the REPL
- Reads user input in a loop
- Prints evaluation results
- Supports exit commands like `exit`, `quit`, or Ctrl-D
- Handles blank input without crashing
- Includes basic REPL loop tests
- [ ] TASK-002: Tokenizer and parser
Dependencies:
- TASK-001
Description:
Implement tokenization and parsing for Lisp expressions. Convert source text into tokens, then parse tokens into an AST representation for atoms, numbers, symbols, and nested lists.
Acceptance Criteria:
- Tokenizes parentheses, symbols, numbers, and strings
- Parses simple atoms
- Parses nested S-expressions
- Reports helpful syntax errors for unbalanced parentheses
- Includes tokenizer and parser tests
- [ ] TASK-003: Evaluator and environment
Dependencies:
- TASK-002
Description:
Implement the evaluator and runtime environment. Support symbol lookup, literal values, basic arithmetic, variable definitions, and nested expression evaluation.
Acceptance Criteria:
- Evaluates numeric literals and symbols
- Supports `+`, `-`, `*`, and `/`
- Supports nested arithmetic expressions
- Implements an environment for symbol bindings
- Supports `define`
- Includes evaluator and environment tests
- [ ] TASK-004: Special forms and functions
Dependencies:
- TASK-003
Description:
Add core Lisp special forms and user-defined functions. Implement `quote`, `if`, `lambda`, and function application with lexical scoping.
Acceptance Criteria:
- Supports quoted expressions
- Supports conditional evaluation with `if`
- Supports anonymous functions with `lambda`
- Supports calling user-defined functions
- Preserves lexical scope through closures
- Includes special form and function tests
- [ ] TASK-005: REPL usability and error handling
Dependencies:
- TASK-004
Description:
Improve the REPL user experience. Add multiline input for incomplete expressions, readable error messages, command history if supported by the platform, and a small standard library of helper functions.
Acceptance Criteria:
- Supports multiline input for unfinished expressions
- Displays readable parse and evaluation errors
- Does not crash on invalid user input
- Provides a small standard library such as `list`, `car`, `cdr`, `cons`, and comparison operators
- Includes REPL behavior and error handling tests

View File

@ -0,0 +1,27 @@
# NightShift Imageboard Target
This project was created with:
```bash
nightshift init --template tutorial-imageboard
```
NightShift control files live in `.nightshift/`. Target application code should live under `src/`, tests under `tests/`, templates under `templates/`, and uploaded/generated static files under `static/`.
Install target dependencies:
```bash
python -m pip install flask pillow pytest
```
Validate the project:
```bash
nightshift validate
```
Run the first task:
```bash
nightshift run --task TASK-001
```

View File

@ -0,0 +1,108 @@
project:
name: imageboard
root: .
task_file: .nightshift/tasks.md
artifact_dir: .nightshift
safety:
require_clean_worktree: false
scoped_paths:
- .
- src
- tests
- templates
- static
- schema.sql
- pyproject.toml
allowed_commands:
- python -m pytest -q
forbidden_commands:
- rm -rf
- git push
- curl | bash
experiment:
label: imageboard-real-model
prompt_variant: ollama-qwen25-coder-14b-v1
agents:
planner:
backend: ollama
model: qwen2.5-coder:14b
temperature: 0.2
system_prompt: .nightshift/agents/planner.md
implementer:
backend: ollama
model: qwen2.5-coder:14b
temperature: 0.1
system_prompt: .nightshift/agents/implementer.md
reviewer:
backend: ollama
model: qwen2.5-coder:14b
temperature: 0.1
system_prompt: .nightshift/agents/reviewer.md
debugger:
backend: ollama
model: qwen2.5-coder:14b
role: debugger
temperature: 0.1
system_prompt: .nightshift/agents/debugger.md
pipeline:
max_task_retries: 3
continue_on_task_failure: false
stages:
- id: plan
type: agent
agent: planner
output: plan.md
- id: context
type: repo_context
output: context-pack.md
- id: implement
type: file_writer
agent_pool:
- implementer
output: proposed.patch
- id: normalize
type: patch_normalizer
output: normalized.patch
- id: validate_patch
type: patch_validator
output: patch-validation.md
max_files: 10
max_lines: 900
max_delete_ratio: 0.70
on_fail: implement
- id: apply_patch
type: patch_apply
mode: apply
output: patch-apply-output.txt
on_fail: implement
- id: test
type: command
commands:
- python -m pytest -q
output: test-output.txt
shell: true
timeout_seconds: 20
on_fail: implement
- id: review
type: agent_review
agent: reviewer
on_fail: implement
output: review.md
- id: summarize
type: summarize
output: final-notes.md

View File

@ -0,0 +1 @@

View File

@ -0,0 +1,7 @@
You are the debugger agent for the NightShift pastebin tutorial.
Diagnose failed attempts without editing files.
Return:
- concise diagnosis
- recommended next action
- do not modify guidance

View File

@ -0,0 +1,9 @@
You are the implementation agent for the NightShift pastebin tutorial.
Output only complete file content blocks.
Use one fenced block per file:
```file:relative/path.py
<complete file content>
```
Keep changes small, deterministic, and covered by tests.

View File

@ -0,0 +1,7 @@
You are the planning agent for the NightShift pastebin tutorial.
Create a concise implementation plan for the current task.
If repository context is needed, request it with lookup_requests.
Prefer small edits and deterministic tests.
Do not write code.

View File

@ -0,0 +1,8 @@
You are the review agent for the NightShift pastebin tutorial.
Output exactly:
status: pass | fail | retry | escalate
reason: <short explanation>
next_stage: <optional stage id>
context_update: <compact useful note>

View File

@ -0,0 +1,56 @@
# Pastebin Tutorial Tasks
- [ ] TASK-001: Snippet creation and viewing
Description:
Complete the pastebin service foundation. Support creating snippets with title, body, optional language, optional tags, and optional expiration date. Support viewing a single snippet by id.
Acceptance Criteria:
- POST `/snippets` creates a snippet with title and body
- GET `/snippets/<id>` returns the snippet
- Optional language, tags, and expires_at fields are persisted
- Tests cover creation and viewing
- [ ] TASK-002: Snippet listing and filtering
Dependencies:
- TASK-001
Description:
Add snippet listing with newest-first ordering and deterministic search/filter behavior.
Acceptance Criteria:
- GET `/snippets` lists snippets newest first
- `q` filters by title or body text
- `language` filters by language
- `tag` filters by tag
- Tests cover listing, search, and filters
- [ ] TASK-003: Expiration handling
Dependencies:
- TASK-002
Description:
Hide expired snippets from list/search results while keeping direct lookup behavior explicit.
Acceptance Criteria:
- Expired snippets are excluded from GET `/snippets`
- Direct lookup of an expired snippet returns 410
- Non-expiring snippets remain visible
- Tests cover expired and active snippets
- [ ] TASK-004: HTML forms and templates
Dependencies:
- TASK-003
Description:
Add simple HTML pages for creating, listing, filtering, and viewing snippets.
Acceptance Criteria:
- GET `/` shows the snippet list
- GET `/new` shows a creation form
- Creating a snippet redirects to the snippet view
- Templates expose language, tags, and expiration fields
- Tests cover HTML response status and redirects

View File

@ -0,0 +1,55 @@
# NightShift Pastebin Tutorial
This template is a small deterministic snippet-hosting service for testing NightShift orchestration.
Create it with:
```bash
nightshift init --template tutorial-pastebin
```
Or create an isolated integration sandbox from the NightShift repository root:
```bash
python -m nightshift.cli integ-run --template tutorial-pastebin
cd integ_runs/<timestamp>/project
```
Activate the generated virtual environment.
PowerShell:
```powershell
..\.venv\Scripts\Activate.ps1
python -m pip install -e ..\..\..
```
Bash:
```bash
source ../.venv/bin/activate
python -m pip install -e ../../..
```
Install target dependencies:
```bash
python -m pip install -e . pytest flask
```
Validate and run:
```bash
nightshift validate
nightshift run --task TASK-001
```
When running from an integration sandbox, the same commands are run inside `integ_runs/<timestamp>/project`.
The pipeline uses model fallback ordering for implementation attempts:
1. `qwen2.5-coder:14b`
2. `carstenuhlig/omnicoder-9b`
3. `deepseek-coder-v2:16b`
Telemetry artifacts record which agent/model handled each stage and estimate token usage.

View File

@ -0,0 +1,124 @@
project:
name: pastebin
root: .
task_file: .nightshift/tasks.md
artifact_dir: .nightshift
safety:
require_clean_worktree: false
scoped_paths:
- src
- tests
- templates
- pyproject.toml
- README.md
allowed_commands:
- python -m pytest -q
forbidden_commands:
- rm -rf
- git push
- curl | bash
experiment:
label: pastebin-model-fallback
prompt_variant: qwen-omnicoder-deepseek-v1
agents:
planner:
backend: ollama
model: qwen2.5-coder:14b
temperature: 0.2
system_prompt: .nightshift/agents/planner.md
implementer_qwen:
backend: ollama
model: qwen2.5-coder:14b
temperature: 0.1
system_prompt: .nightshift/agents/implementer.md
implementer_omnicoder:
backend: ollama
model: carstenuhlig/omnicoder-9b
temperature: 0.1
system_prompt: .nightshift/agents/implementer.md
implementer_deepseek:
backend: ollama
model: deepseek-coder-v2:16b
temperature: 0.1
system_prompt: .nightshift/agents/implementer.md
debugger:
backend: ollama
model: qwen2.5-coder:14b
role: debugger
temperature: 0.1
system_prompt: .nightshift/agents/debugger.md
reviewer:
backend: ollama
model: qwen2.5-coder:14b
temperature: 0.1
system_prompt: .nightshift/agents/reviewer.md
pipeline:
max_task_retries: 3
continue_on_task_failure: false
stages:
- id: plan
type: agent
agent: planner
output: plan.md
- id: semantic_context
type: semantic_context
output: semantic-context.md
- id: context
type: repo_context
output: context-pack.md
- id: implement
type: file_writer
agent_pool:
- implementer_qwen
- implementer_omnicoder
- implementer_deepseek
output: proposed.patch
- id: normalize
type: patch_normalizer
output: normalized.patch
- id: validate_patch
type: patch_validator
output: patch-validation.md
max_files: 12
max_lines: 900
max_delete_ratio: 0.70
on_fail: implement
- id: apply_patch
type: patch_apply
mode: apply
output: patch-apply-output.txt
on_fail: implement
- id: test
type: command
commands:
- python -m pytest -q
output: test-output.txt
shell: true
timeout_seconds: 25
on_fail: implement
- id: review
type: agent_review
agent: reviewer
output: review.md
on_fail: implement
- id: summarize
type: summarize
output: final-notes.md

View File

@ -0,0 +1,12 @@
[build-system]
requires = ["setuptools>=69"]
build-backend = "setuptools.build_meta"
[project]
name = "nightshift-pastebin-target"
version = "0.1.0"
requires-python = ">=3.11"
dependencies = ["flask"]
[tool.setuptools.packages.find]
where = ["src"]

View File

@ -0,0 +1,3 @@
from .app import create_app
__all__ = ["create_app"]

View File

@ -0,0 +1,128 @@
from __future__ import annotations
from datetime import datetime, timezone
import sqlite3
from pathlib import Path
from flask import Flask, abort, g, jsonify, redirect, render_template, request, url_for
SCHEMA = """
create table if not exists snippets (
id integer primary key autoincrement,
title text not null,
body text not null,
language text default '',
tags text default '',
expires_at text default '',
created_at text not null
);
"""
def create_app(database_path: str | Path | None = None) -> Flask:
app = Flask(__name__, template_folder=str(Path(__file__).resolve().parents[2] / "templates"))
app.config["DATABASE"] = str(database_path or Path(app.instance_path) / "pastebin.sqlite3")
@app.before_request
def _open_db() -> None:
Path(app.config["DATABASE"]).parent.mkdir(parents=True, exist_ok=True)
g.db = sqlite3.connect(app.config["DATABASE"])
g.db.row_factory = sqlite3.Row
g.db.execute(SCHEMA)
@app.teardown_request
def _close_db(exc) -> None:
db = g.pop("db", None)
if db is not None:
db.close()
@app.get("/")
def index():
snippets = list_snippets(g.db, request.args)
return render_template("index.html", snippets=snippets)
@app.get("/new")
def new_snippet():
return render_template("new.html")
@app.post("/snippets")
def create_snippet_route():
snippet_id = create_snippet(g.db, request.form or request.json or {})
wants_json = request.is_json or "application/json" in request.headers.get("Accept", "")
if wants_json:
return jsonify(get_snippet(g.db, snippet_id)), 201
return redirect(url_for("view_snippet", snippet_id=snippet_id))
@app.get("/snippets")
def list_snippets_route():
snippets = list_snippets(g.db, request.args)
if "application/json" in request.headers.get("Accept", ""):
return jsonify(snippets)
return render_template("index.html", snippets=snippets)
@app.get("/snippets/<int:snippet_id>")
def view_snippet(snippet_id: int):
snippet = get_snippet(g.db, snippet_id)
if snippet is None:
abort(404)
if is_expired(snippet):
abort(410)
if "application/json" in request.headers.get("Accept", ""):
return jsonify(snippet)
return render_template("view.html", snippet=snippet)
return app
def create_snippet(db: sqlite3.Connection, data) -> int:
title = str(data.get("title", "")).strip()
body = str(data.get("body", "")).strip()
if not title or not body:
raise ValueError("title and body are required")
cursor = db.execute(
"insert into snippets(title, body, language, tags, expires_at, created_at) values (?, ?, ?, ?, ?, ?)",
(
title,
body,
str(data.get("language", "")).strip(),
str(data.get("tags", "")).strip(),
str(data.get("expires_at", "")).strip(),
datetime.now(timezone.utc).isoformat(),
),
)
db.commit()
return int(cursor.lastrowid)
def get_snippet(db: sqlite3.Connection, snippet_id: int) -> dict | None:
row = db.execute("select * from snippets where id = ?", (snippet_id,)).fetchone()
return dict(row) if row else None
def list_snippets(db: sqlite3.Connection, args) -> list[dict]:
rows = db.execute("select * from snippets order by id desc").fetchall()
snippets = [dict(row) for row in rows if not is_expired(dict(row))]
query = str(args.get("q", "")).lower()
language = str(args.get("language", "")).lower()
tag = str(args.get("tag", "")).lower()
if query:
snippets = [item for item in snippets if query in item["title"].lower() or query in item["body"].lower()]
if language:
snippets = [item for item in snippets if item["language"].lower() == language]
if tag:
snippets = [item for item in snippets if tag in [part.strip().lower() for part in item["tags"].split(",")]]
return snippets
def is_expired(snippet: dict) -> bool:
value = snippet.get("expires_at") or ""
if not value:
return False
try:
expires = datetime.fromisoformat(value)
except ValueError:
return False
if expires.tzinfo is None:
expires = expires.replace(tzinfo=timezone.utc)
return expires <= datetime.now(timezone.utc)

View File

@ -0,0 +1,18 @@
<!doctype html>
<html lang="en">
<body>
<h1>Snippets</h1>
<a href="/new">New snippet</a>
<form method="get" action="/snippets">
<input name="q" placeholder="Search">
<input name="language" placeholder="Language">
<input name="tag" placeholder="Tag">
<button type="submit">Filter</button>
</form>
<ul>
{% for snippet in snippets %}
<li><a href="/snippets/{{ snippet.id }}">{{ snippet.title }}</a> {{ snippet.language }} {{ snippet.tags }}</li>
{% endfor %}
</ul>
</body>
</html>

View File

@ -0,0 +1,14 @@
<!doctype html>
<html lang="en">
<body>
<h1>New Snippet</h1>
<form method="post" action="/snippets">
<input name="title" placeholder="Title" required>
<textarea name="body" required></textarea>
<input name="language" placeholder="Language">
<input name="tags" placeholder="Tags">
<input name="expires_at" placeholder="Expires at ISO timestamp">
<button type="submit">Create</button>
</form>
</body>
</html>

View File

@ -0,0 +1,8 @@
<!doctype html>
<html lang="en">
<body>
<h1>{{ snippet.title }}</h1>
<p>{{ snippet.language }} {{ snippet.tags }}</p>
<pre>{{ snippet.body }}</pre>
</body>
</html>

View File

@ -0,0 +1,51 @@
from datetime import datetime, timedelta, timezone
from pastebin_app import create_app
def client(tmp_path):
app = create_app(tmp_path / "pastebin.sqlite3")
app.config["TESTING"] = True
return app.test_client()
def test_create_and_view_snippet(tmp_path):
test_client = client(tmp_path)
response = test_client.post(
"/snippets",
json={"title": "Hello", "body": "print('hi')", "language": "python", "tags": "demo,test"},
headers={"Accept": "application/json"},
)
assert response.status_code == 201
snippet_id = response.get_json()["id"]
view = test_client.get(f"/snippets/{snippet_id}", headers={"Accept": "application/json"})
assert view.status_code == 200
assert view.get_json()["language"] == "python"
def test_list_search_and_filters(tmp_path):
test_client = client(tmp_path)
test_client.post("/snippets", json={"title": "Python note", "body": "flask route", "language": "python", "tags": "web"})
test_client.post("/snippets", json={"title": "SQL note", "body": "select", "language": "sql", "tags": "data"})
search = test_client.get("/snippets?q=flask", headers={"Accept": "application/json"}).get_json()
language = test_client.get("/snippets?language=sql", headers={"Accept": "application/json"}).get_json()
tag = test_client.get("/snippets?tag=web", headers={"Accept": "application/json"}).get_json()
assert [item["title"] for item in search] == ["Python note"]
assert [item["title"] for item in language] == ["SQL note"]
assert [item["title"] for item in tag] == ["Python note"]
def test_expired_snippet_hidden_and_direct_lookup_gone(tmp_path):
test_client = client(tmp_path)
expired = (datetime.now(timezone.utc) - timedelta(days=1)).isoformat()
response = test_client.post("/snippets", json={"title": "Old", "body": "gone", "expires_at": expired}, headers={"Accept": "application/json"})
snippet_id = response.get_json()["id"]
listed = test_client.get("/snippets", headers={"Accept": "application/json"}).get_json()
direct = test_client.get(f"/snippets/{snippet_id}", headers={"Accept": "application/json"})
assert listed == []
assert direct.status_code == 410

View File

@ -50,6 +50,11 @@ class ReportGenerator:
"stage-results.md",
format_stage_results(task, status, reason, retry_count, stage_results),
)
artifact_index_path = self.artifacts.write_stage_output(
task.id,
"artifact-index.md",
format_artifact_index(self.artifacts.create_task_dir(task.id).directory),
)
final_notes_path = self.artifacts.write_final_task_notes(
task.id,
format_task_report(
@ -61,6 +66,7 @@ class ReportGenerator:
modified_files=modified_files,
stage_results_path=stage_results_path,
context_out_path=context_out_path,
artifact_index_path=artifact_index_path,
experiment_label=self.experiment_label,
prompt_variant=self.prompt_variant,
),
@ -138,6 +144,7 @@ def format_task_report(
modified_files: list[str],
stage_results_path: Path,
context_out_path: Path | None,
artifact_index_path: Path | None,
experiment_label: str | None = None,
prompt_variant: str | None = None,
) -> str:
@ -149,6 +156,8 @@ def format_task_report(
]
if context_out_path is not None:
artifact_lines.append(f"- Context out: `{context_out_path.name}`")
if artifact_index_path is not None:
artifact_lines.append(f"- Artifact index: `{artifact_index_path.name}`")
modified = "\n".join(f"- `{path}`" for path in modified_files) if modified_files else "- Unavailable or none detected"
return "\n".join(
@ -186,6 +195,37 @@ def format_task_report(
)
def format_artifact_index(task_dir: Path) -> str:
groups: dict[str, list[str]] = {
"Core": [],
"Patch Flow": [],
"Diagnostics": [],
"Retries": [],
"Resources": [],
"Other": [],
}
for path in sorted(item for item in task_dir.rglob("*") if item.is_file()):
relative = path.relative_to(task_dir).as_posix()
target = "Other"
if relative in {"task.md", "context.md", "context-out.md", "stage-results.md", "task-completion.md", "final-notes.md"}:
target = "Core"
elif relative.endswith(".patch") or "patch-" in relative or "normalized" in relative:
target = "Patch Flow"
elif relative.startswith("diagnostics/") or "failure" in relative:
target = "Diagnostics"
elif relative.startswith("retries/") or "retry" in relative or "repair" in relative:
target = "Retries"
elif relative.startswith("resources/") or relative == "resource-requests.md":
target = "Resources"
groups[target].append(relative)
lines = ["# Artifact Index", ""]
for name, paths in groups.items():
lines.extend([f"## {name}", ""])
lines.extend(f"- `{path}`" for path in paths) if paths else lines.append("- None")
lines.append("")
return "\n".join(lines)
def format_run_summary(
task: Task,
status: str,

95
nightshift/resources.py Normal file
View File

@ -0,0 +1,95 @@
"""Structured blocked/resource request handling."""
from __future__ import annotations
from dataclasses import dataclass
import json
from pathlib import Path
import re
import sqlite3
from .artifacts import ArtifactStore
from .errors import PipelineError
SUPPORTED_RESOURCE_TYPES = {"png", "jpg", "jpeg", "json", "sqlite", "text", "blob"}
@dataclass(frozen=True)
class ResourceRequest:
kind: str
path: str
reason: str = ""
def parse_resource_requests(text: str) -> tuple[ResourceRequest, ...]:
"""Parse simple blocked/resource requests from agent stdout."""
requests: list[ResourceRequest] = []
for match in re.finditer(
r"blocked_request:\s*(?P<kind>[A-Za-z0-9_-]+)\s+(?P<path>[^\s]+)(?:\s+(?P<reason>.*))?",
text,
flags=re.IGNORECASE,
):
requests.append(
ResourceRequest(
kind=match.group("kind").lower(),
path=match.group("path").strip(),
reason=(match.group("reason") or "").strip(),
)
)
return tuple(requests)
def satisfy_resource_requests(artifacts: ArtifactStore, task_id: str, requests: tuple[ResourceRequest, ...]) -> tuple[Path, ...]:
written: list[Path] = []
base = artifacts.create_task_dir(task_id).directory / "resources"
base.mkdir(parents=True, exist_ok=True)
for request in requests:
kind = request.kind.lower()
if kind not in SUPPORTED_RESOURCE_TYPES:
raise PipelineError(f"Blocked resource request has unsupported type '{request.kind}'.")
safe_name = _safe_relative_path(request.path)
target = base / safe_name
target.parent.mkdir(parents=True, exist_ok=True)
if kind == "json":
target.write_text(json.dumps({"generated_by": "nightshift", "reason": request.reason}, indent=2) + "\n", encoding="utf-8")
elif kind == "sqlite":
with sqlite3.connect(target) as connection:
connection.execute("create table if not exists nightshift_fixture (id integer primary key, value text)")
connection.execute("insert into nightshift_fixture(value) values (?)", ("generated",))
elif kind in {"text", "blob"}:
target.write_text(request.reason or "generated fixture\n", encoding="utf-8")
else:
target.write_bytes(_tiny_image_bytes(kind))
written.append(target)
return tuple(written)
def format_resource_report(requests: tuple[ResourceRequest, ...], paths: tuple[Path, ...], root: Path) -> str:
lines = ["# Resource Requests", ""]
for request, path in zip(requests, paths):
lines.extend(
[
f"- Type: {request.kind}",
f" Path: `{path.relative_to(root).as_posix()}`",
f" Reason: {request.reason}",
]
)
if not requests:
lines.append("- None")
lines.append("")
return "\n".join(lines)
def _safe_relative_path(path_text: str) -> Path:
path = Path(path_text.replace("\\", "/"))
if path.is_absolute() or ".." in path.parts or not path.name:
raise PipelineError(f"Blocked resource request has unsafe path '{path_text}'.")
return path
def _tiny_image_bytes(kind: str) -> bytes:
if kind in {"jpg", "jpeg"}:
return bytes.fromhex("ffd8ffe000104a46494600010101000100010000ffdb004300" + "08" * 64 + "ffc00011080001000103012200021101031101ffc40014000100000000000000000000000000000000000000ffc40014100100000000000000000000000000000000000000ffda000c03010002110311003f00d2cf20ffd9")
return bytes.fromhex("89504e470d0a1a0a0000000d49484452000000010000000108060000001f15c4890000000a49444154789c63000100000500010d0a2db40000000049454e44ae426082")

View File

@ -0,0 +1,39 @@
"""Compact retry memory artifacts."""
from __future__ import annotations
from dataclasses import dataclass
from .stages import StageResult
@dataclass(frozen=True)
class RetryMemoryEntry:
attempt: int
stage_id: str
status: str
cause: str
next_stage: str
def summarize_retry_memory(entries: tuple[RetryMemoryEntry, ...]) -> str:
lines = ["# Retry Memory", ""]
if not entries:
lines.append("- None")
for entry in entries[-8:]:
lines.append(
f"- Attempt {entry.attempt}: `{entry.stage_id}` returned {entry.status}; "
f"cause: {entry.cause}; next: `{entry.next_stage}`"
)
lines.append("")
return "\n".join(lines)
def entry_from_stage(attempt: int, result: StageResult, next_stage: str) -> RetryMemoryEntry:
return RetryMemoryEntry(
attempt=attempt,
stage_id=result.stage_id,
status=result.status,
cause=result.reason,
next_stage=next_stage,
)

View File

@ -0,0 +1,146 @@
"""Lightweight repository semantic index."""
from __future__ import annotations
from dataclasses import dataclass
import ast
from pathlib import Path
import re
from .config import SafetyConfig
from .safety import resolve_project_root, validate_scoped_paths
@dataclass(frozen=True)
class IndexedFile:
path: str
symbols: tuple[str, ...]
imports: tuple[str, ...]
tests: tuple[str, ...]
keywords: tuple[str, ...]
snippet: str
def build_semantic_index(project_root: str | Path, safety: SafetyConfig, *, max_files: int = 120) -> tuple[IndexedFile, ...]:
root = resolve_project_root(project_root)
scoped_roots = validate_scoped_paths(root, safety.scoped_paths or (".",))
files: list[IndexedFile] = []
for scoped_root in scoped_roots:
for path in sorted(scoped_root.rglob("*")):
if len(files) >= max_files:
return tuple(files)
if not path.is_file() or _skip(path, root):
continue
relative = path.relative_to(root).as_posix()
text = path.read_text(encoding="utf-8", errors="replace")
files.append(_index_file(relative, text))
return tuple(files)
def search_index(index: tuple[IndexedFile, ...], query: str, *, limit: int = 5) -> tuple[IndexedFile, ...]:
query_terms = _keywords(query)
scored: list[tuple[int, IndexedFile]] = []
for item in index:
haystack = set(item.keywords) | set(_keywords(item.path)) | set(_keywords(" ".join(item.symbols + item.imports + item.tests)))
score = sum(3 if term in item.symbols or term in item.tests else 1 for term in query_terms if term in haystack)
if score:
scored.append((score, item))
scored.sort(key=lambda pair: (-pair[0], pair[1].path))
return tuple(item for _, item in scored[:limit])
def format_semantic_index(index: tuple[IndexedFile, ...]) -> str:
lines = ["# Semantic Index", "", f"Files indexed: {len(index)}", ""]
for item in index:
lines.extend(
[
f"## `{item.path}`",
"",
f"- Symbols: {', '.join(item.symbols) or 'None'}",
f"- Imports: {', '.join(item.imports) or 'None'}",
f"- Tests: {', '.join(item.tests) or 'None'}",
"",
"```text",
item.snippet,
"```",
"",
]
)
return "\n".join(lines)
def format_search_results(results: tuple[IndexedFile, ...], query: str) -> str:
lines = ["# Semantic Context", "", f"Query: {query}", ""]
if not results:
lines.append("- No matching files.")
lines.append("")
return "\n".join(lines)
for item in results:
lines.extend(
[
f"## `{item.path}`",
"",
f"- Symbols: {', '.join(item.symbols) or 'None'}",
f"- Tests: {', '.join(item.tests) or 'None'}",
"",
"```text",
item.snippet,
"```",
"",
]
)
return "\n".join(lines)
def _index_file(path: str, text: str) -> IndexedFile:
symbols: list[str] = []
imports: list[str] = []
tests: list[str] = []
if path.endswith(".py"):
try:
tree = ast.parse(text)
except SyntaxError:
tree = None
if tree is not None:
for node in ast.walk(tree):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)):
symbols.append(node.name)
if node.name.startswith("test_"):
tests.append(node.name)
elif isinstance(node, ast.Import):
imports.extend(alias.name.split(".")[0] for alias in node.names)
elif isinstance(node, ast.ImportFrom) and node.module:
imports.append(node.module.split(".")[0])
return IndexedFile(
path=path,
symbols=tuple(_dedupe(symbols)),
imports=tuple(_dedupe(imports)),
tests=tuple(_dedupe(tests)),
keywords=tuple(_keywords(text + " " + path)),
snippet="\n".join(text.splitlines()[:40]),
)
def _skip(path: Path, root: Path) -> bool:
relative = path.relative_to(root).as_posix()
parts = set(Path(relative).parts)
if parts & {".git", ".nightshift", "__pycache__", ".venv", "venv", "integ_runs"}:
return True
return path.suffix.lower() not in {".py", ".md", ".txt", ".yaml", ".yml", ".toml", ".html", ".css", ".js"}
def _keywords(text: str) -> tuple[str, ...]:
expanded = re.sub(r"[_\d]+", " ", text)
words = list(re.findall(r"[A-Za-z][A-Za-z0-9_]{2,}", text))
words.extend(re.findall(r"[A-Za-z][A-Za-z]{1,}", expanded))
return tuple(_dedupe(word.lower() for word in words))
def _dedupe(values) -> list[str]:
seen: set[str] = set()
result: list[str] = []
for value in values:
if value and value not in seen:
seen.add(value)
result.append(value)
return result

122
nightshift/telemetry.py Normal file
View File

@ -0,0 +1,122 @@
"""Run telemetry aggregation."""
from __future__ import annotations
from dataclasses import dataclass
import re
@dataclass(frozen=True)
class TelemetryEntry:
stage_id: str
stage_type: str
status: str
agent_id: str | None
model: str | None
duration_seconds: float
prompt_tokens: int
output_tokens: int
retry_count: int
def telemetry_from_stage_output(
*,
stage_id: str,
stage_type: str,
status: str,
output: str,
retry_count: int,
agent_id: str | None = None,
model: str | None = None,
) -> TelemetryEntry:
parsed_agent = _field(output, "Agent") or agent_id
duration = _float_field(output, "Duration seconds")
prompt = _section(output, "Prompt")
stdout = _section(output, "stdout")
stderr = _section(output, "stderr")
if not prompt:
prompt = ""
if not stdout and not stderr:
stdout = output
return TelemetryEntry(
stage_id=stage_id,
stage_type=stage_type,
status=status,
agent_id=parsed_agent,
model=model,
duration_seconds=duration,
prompt_tokens=estimate_tokens(prompt),
output_tokens=estimate_tokens("\n".join([stdout, stderr])),
retry_count=retry_count,
)
def estimate_tokens(text: str) -> int:
if not text:
return 0
return max(1, (len(re.findall(r"\S+", text)) * 4 + 2) // 3)
def format_telemetry_summary(entries: tuple[TelemetryEntry, ...]) -> str:
total_duration = sum(entry.duration_seconds for entry in entries)
total_prompt = sum(entry.prompt_tokens for entry in entries)
total_output = sum(entry.output_tokens for entry in entries)
failures = sum(1 for entry in entries if entry.status != "pass")
lines = [
"# Telemetry Summary",
"",
f"Stages observed: {len(entries)}",
f"Failures observed: {failures}",
f"Total runtime seconds: {total_duration:.3f}",
f"Estimated prompt tokens: {total_prompt}",
f"Estimated output tokens: {total_output}",
f"Estimated total tokens: {total_prompt + total_output}",
"",
"## Per Model",
"",
]
by_model: dict[str, list[TelemetryEntry]] = {}
for entry in entries:
key = entry.model or entry.agent_id or entry.stage_type
by_model.setdefault(key, []).append(entry)
if not by_model:
lines.append("- None")
for model, model_entries in sorted(by_model.items()):
successes = sum(1 for entry in model_entries if entry.status == "pass")
lines.append(
f"- `{model}`: stages={len(model_entries)}, successes={successes}, "
f"failures={len(model_entries) - successes}, "
f"runtime={sum(entry.duration_seconds for entry in model_entries):.3f}s, "
f"tokens={sum(entry.prompt_tokens + entry.output_tokens for entry in model_entries)}"
)
lines.extend(["", "## Stages", ""])
for entry in entries:
lines.append(
f"- `{entry.stage_id}` ({entry.stage_type}): {entry.status}, "
f"agent={entry.agent_id or ''}, model={entry.model or ''}, "
f"retry={entry.retry_count}, runtime={entry.duration_seconds:.3f}s, "
f"tokens={entry.prompt_tokens + entry.output_tokens}"
)
lines.append("")
return "\n".join(lines)
def _field(text: str, name: str) -> str | None:
match = re.search(rf"^{re.escape(name)}:\s*`?([^`\n]+)`?", text, re.MULTILINE)
return match.group(1).strip() if match else None
def _float_field(text: str, name: str) -> float:
value = _field(text, name)
if value is None:
return 0.0
try:
return float(value)
except ValueError:
return 0.0
def _section(text: str, heading: str) -> str:
pattern = rf"^## {re.escape(heading)}\s*\n\n(?:```[A-Za-z0-9_-]*\n)?(.*?)(?:\n```)?(?:\n\n^## |\Z)"
match = re.search(pattern, text, flags=re.MULTILINE | re.DOTALL)
return match.group(1).strip() if match else ""

View File

@ -33,6 +33,12 @@ agents:
command: echo
system_prompt: agents/reviewer.md
debugger:
backend: command
command: echo
role: debugger
system_prompt: agents/debugger.md
pipeline:
max_task_retries: 3
stages:
@ -49,7 +55,8 @@ pipeline:
- id: implement
type: agent
agent: implementer
agent_pool:
- implementer
output: implementation-log.md
- id: test
@ -109,6 +116,18 @@ Rules:
- Write useful implementation notes.
"""
DEBUGGER_PROMPT = """# Debugger
You diagnose failed attempts for NightShift.
Output:
- concise diagnosis
- recommended next action
- do not modify guidance
Do not directly modify files.
"""
REVIEWER_PROMPT = """# Reviewer
You are the review agent for NightShift.
@ -168,6 +187,13 @@ agents:
temperature: 0.1
system_prompt: .nightshift/agents/reviewer.md
debugger:
backend: ollama
model: qwen2.5-coder:14b
role: debugger
temperature: 0.1
system_prompt: .nightshift/agents/debugger.md
pipeline:
max_task_retries: 3
continue_on_task_failure: false
@ -183,7 +209,8 @@ pipeline:
- id: implement
type: file_writer
agent: implementer
agent_pool:
- implementer
output: proposed.patch
- id: normalize
@ -195,6 +222,7 @@ pipeline:
output: patch-validation.md
max_files: 10
max_lines: 900
max_delete_ratio: 0.70
on_fail: implement
- id: apply_patch
@ -347,6 +375,16 @@ Use fail when the patch is unsafe, unrelated, or clearly broken.
Use pass only when the acceptance criteria are satisfied.
"""
REAL_MODEL_DEBUGGER_PROMPT = """You are the debugger agent for NightShift.
Diagnose failed attempts without editing files.
Use the task, current patch, failure output, and retry history to produce:
- concise diagnosis
- recommended next action
- do not modify guidance
"""
IMAGEBOARD_README = """# NightShift Imageboard Target
This project was created with:

View File

@ -26,7 +26,7 @@ BANNER_MESSAGES = [
"WHO UP BREAKIN THEY BUILD?",
"me and the boys at 2am lookin for BEANS",
"local-first autonomous coding pipeline",
"why break then build while you're awake?",
"why break the build while you're awake?",
"compiling bad ideas into good software",
"local-first synthetic cognition",
"the graveyard shift for software engineering",

View File

@ -59,6 +59,34 @@ class InitProjectTests(unittest.TestCase):
self.assertIn("real-long-running", available_templates())
self.assertIn("real-simple", available_templates())
self.assertIn("tutorial-imageboard", available_templates())
self.assertIn("tutorial-pastebin", available_templates())
def test_init_pastebin_template_creates_app_and_model_fallback_config(self) -> None:
with tempfile.TemporaryDirectory() as directory:
root = Path(directory)
init_project(root, template="tutorial-pastebin")
config = (root / "nightshift.yaml").read_text(encoding="utf-8")
self.assertTrue((root / ".nightshift" / "tasks.md").exists())
self.assertTrue((root / "src" / "pastebin_app" / "app.py").exists())
self.assertTrue((root / "tests" / "test_pastebin.py").exists())
self.assertIn("type: semantic_context", config)
self.assertIn("implementer_qwen", config)
self.assertIn("carstenuhlig/omnicoder-9b", config)
self.assertIn("deepseek-coder-v2:16b", config)
def test_pastebin_example_tutorial_docs_exist(self) -> None:
root = Path(__file__).resolve().parents[1]
tutorial = root / "examples" / "tutorial" / "03-pastebin"
self.assertTrue((tutorial / "README.md").exists())
self.assertTrue((tutorial / "tasks.md").exists())
self.assertTrue((tutorial / "nightshift.yaml").exists())
self.assertIn(
"nightshift init --template tutorial-pastebin",
(tutorial / "README.md").read_text(encoding="utf-8"),
)
def test_init_rejects_unknown_template(self) -> None:
with tempfile.TemporaryDirectory() as directory:

View File

@ -0,0 +1,145 @@
from pathlib import Path
from dataclasses import replace
import tempfile
import unittest
from nightshift.artifacts import ArtifactStore
from nightshift.config import parse_config, StageConfig
from nightshift.failures import classify_failure
from nightshift.integ import cleanup_integration_runs, create_integration_run
from nightshift.patches import validate_patch
from nightshift.pipeline import PipelineRunner
from nightshift.tasks import parse_tasks
from tests.test_pipeline import TASK_MD, make_config, _write_common_files
class ReliabilityFeatureTests(unittest.TestCase):
def test_failure_classifier_detects_missing_dependency(self) -> None:
result = classify_failure("ModuleNotFoundError: No module named 'flask'", exit_code=1)
self.assertEqual(result.category, "missing dependency")
self.assertIn("flask", result.probable_root_cause)
self.assertIn("do not retry", result.retry_recommendation)
def test_command_failure_writes_diagnostics_and_retry_memory(self) -> None:
with tempfile.TemporaryDirectory() as directory:
root = Path(directory)
_write_common_files(root)
command = 'python -c "raise AssertionError(\'expected value\')"'
stages = (
StageConfig(
id="test",
type="command",
commands=(command,),
output="test-output.txt",
on_fail="plan",
),
StageConfig(id="plan", type="agent", agent="planner", output="plan.md"),
)
config = make_config(root, stages, max_retries=1)
config = replace(
config,
safety=replace(config.safety, allowed_commands=(command,)),
)
runner = PipelineRunner(config, ArtifactStore(root, ".nightshift", run_id="test-run"))
result = runner.run_task(parse_tasks(TASK_MD)[0])
task_dir = root / ".nightshift" / "runs" / "test-run" / "tasks" / "TASK-001"
self.assertEqual(result.status, "complete")
self.assertTrue((task_dir / "diagnostics" / "test-failure.md").exists())
self.assertTrue((task_dir / "retry-memory.md").exists())
self.assertTrue((task_dir / "escalation-policy.md").exists())
self.assertIn("test expectation mismatch", (task_dir / "diagnostics" / "test-failure.md").read_text(encoding="utf-8"))
def test_agent_blocked_request_generates_run_local_fixture(self) -> None:
with tempfile.TemporaryDirectory() as directory:
root = Path(directory)
_write_common_files(root)
(root / "fake_agent.py").write_text(
"print('blocked_request: json fixtures/input.json missing json fixture')\n",
encoding="utf-8",
)
stages = (StageConfig(id="plan", type="agent", agent="planner", output="plan.md"),)
config = make_config(root, stages)
config.agents["planner"] = replace(
config.agents["planner"],
command="python fake_agent.py",
)
runner = PipelineRunner(config, ArtifactStore(root, ".nightshift", run_id="test-run"))
result = runner.run_task(parse_tasks(TASK_MD)[0])
task_dir = root / ".nightshift" / "runs" / "test-run" / "tasks" / "TASK-001"
self.assertEqual(result.status, "complete")
self.assertTrue((task_dir / "resources" / "fixtures" / "input.json").exists())
self.assertTrue((task_dir / "resource-requests.md").exists())
def test_config_parses_agent_pool_and_delete_ratio(self) -> None:
root = Path.cwd()
raw = {
"project": {"name": "x", "root": ".", "task_file": "tasks.md", "artifact_dir": ".nightshift"},
"safety": {"scoped_paths": ["."], "allowed_commands": [], "forbidden_commands": []},
"agents": {
"a": {"backend": "command", "command": "echo", "system_prompt": "a.md"},
"b": {"backend": "command", "command": "echo", "system_prompt": "b.md"},
},
"pipeline": {
"max_task_retries": 1,
"stages": [
{
"id": "write",
"type": "file_writer",
"agent_pool": ["a", "b"],
"max_delete_ratio": 0.5,
}
],
},
}
config = parse_config(raw, root / "nightshift.yaml")
self.assertEqual(config.pipeline.stages[0].agent, "a")
self.assertEqual(config.pipeline.stages[0].agent_pool, ("a", "b"))
self.assertEqual(config.pipeline.stages[0].max_delete_ratio, 0.5)
def test_patch_governor_rejects_deletion_heavy_patch(self) -> None:
with tempfile.TemporaryDirectory() as directory:
root = Path(directory)
(root / "app.py").write_text("one\ntwo\nthree\n", encoding="utf-8")
patch = "\n".join(
[
"diff --git a/app.py b/app.py",
"--- a/app.py",
"+++ b/app.py",
"@@ -1,3 +1 @@",
"-one",
"-two",
"-three",
"+one",
"",
]
)
config = make_config(root, ())
with self.assertRaises(Exception) as raised:
validate_patch(patch, root, config.safety, max_delete_ratio=0.5)
self.assertIn("deletion-heavy", str(raised.exception))
def test_integration_run_creation_and_cleanup(self) -> None:
with tempfile.TemporaryDirectory() as directory:
root = Path(directory)
first = create_integration_run(root, template="basic")
second = create_integration_run(root, template="basic")
removed = cleanup_integration_runs(root / "integ_runs", keep=1)
self.assertTrue(first.log_path.exists() or first.directory in removed)
self.assertTrue(second.directory.exists())
self.assertEqual(len(removed), 1)
if __name__ == "__main__":
unittest.main()

View File

@ -0,0 +1,113 @@
from pathlib import Path
from dataclasses import replace
import tempfile
import unittest
from nightshift.artifacts import ArtifactStore
from nightshift.config import SafetyConfig, StageConfig
from nightshift.pipeline import PipelineRunner
from nightshift.semantic_index import build_semantic_index, search_index
from nightshift.tasks import parse_tasks
from nightshift.telemetry import estimate_tokens, format_telemetry_summary, telemetry_from_stage_output
from tests.test_pipeline import TASK_MD, make_config, _write_common_files
class TelemetryAndIndexTests(unittest.TestCase):
def test_telemetry_estimates_tokens_and_groups_by_model(self) -> None:
output = "\n".join(
[
"# Agent Output: plan",
"",
"Agent: `planner`",
"Duration seconds: 1.250",
"",
"## stdout",
"",
"```text",
"plan ok",
"```",
"",
"## Prompt",
"",
"```markdown",
"hello world",
"```",
]
)
entry = telemetry_from_stage_output(
stage_id="plan",
stage_type="agent",
status="pass",
output=output,
retry_count=0,
model="qwen2.5-coder:14b",
)
summary = format_telemetry_summary((entry,))
self.assertGreater(estimate_tokens("hello world"), 0)
self.assertEqual(entry.agent_id, "planner")
self.assertEqual(entry.duration_seconds, 1.25)
self.assertIn("qwen2.5-coder:14b", summary)
def test_pipeline_writes_telemetry_summary(self) -> None:
with tempfile.TemporaryDirectory() as directory:
root = Path(directory)
_write_common_files(root)
stages = (StageConfig(id="plan", type="agent", agent="planner", output="plan.md"),)
config = make_config(root, stages)
runner = PipelineRunner(config, ArtifactStore(root, ".nightshift", run_id="test-run"))
result = runner.run_task(parse_tasks(TASK_MD)[0])
task_dir = root / ".nightshift" / "runs" / "test-run" / "tasks" / "TASK-001"
self.assertEqual(result.status, "complete")
self.assertTrue((task_dir / "telemetry-summary.md").exists())
self.assertTrue((root / ".nightshift" / "runs" / "test-run" / "telemetry-summary.md").exists())
def test_semantic_index_finds_symbols_and_tests(self) -> None:
with tempfile.TemporaryDirectory() as directory:
root = Path(directory)
(root / "src").mkdir()
(root / "tests").mkdir()
(root / "src" / "service.py").write_text(
"import sqlite3\n\nclass SnippetStore:\n pass\n\ndef create_snippet():\n return True\n",
encoding="utf-8",
)
(root / "tests" / "test_service.py").write_text(
"def test_create_snippet():\n assert True\n",
encoding="utf-8",
)
safety = SafetyConfig(
require_clean_worktree=False,
scoped_paths=("src", "tests"),
allowed_commands=(),
forbidden_commands=(),
)
index = build_semantic_index(root, safety)
results = search_index(index, "create snippet sqlite")
self.assertTrue(any("create_snippet" in item.symbols for item in index))
self.assertTrue(any(item.path == "src/service.py" for item in results))
def test_semantic_context_stage_writes_artifacts(self) -> None:
with tempfile.TemporaryDirectory() as directory:
root = Path(directory)
_write_common_files(root)
(root / "snippet.py").write_text("def create_snippet():\n return 'ok'\n", encoding="utf-8")
stages = (StageConfig(id="semantic", type="semantic_context", output="semantic-context.md"),)
config = make_config(root, stages)
runner = PipelineRunner(config, ArtifactStore(root, ".nightshift", run_id="test-run"))
result = runner.run_task(parse_tasks(TASK_MD)[0])
task_dir = root / ".nightshift" / "runs" / "test-run" / "tasks" / "TASK-001"
self.assertEqual(result.status, "complete")
self.assertTrue((task_dir / "semantic-index.md").exists())
self.assertTrue((task_dir / "semantic-context.md").exists())
if __name__ == "__main__":
unittest.main()