import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest'; import { mkdtempSync, existsSync, readFileSync } from 'node:fs'; import { rm } from 'node:fs/promises'; import { tmpdir } from 'node:os'; import { join } from 'node:path'; import Database from 'better-sqlite3'; import { migrateCritique, getCritiqueRun } from '../src/critique/persistence.js'; import { runOrchestrator, type CritiqueSseBus, type OrchestratorParams } from '../src/critique/orchestrator.js'; import type { CritiqueSseEvent } from '@open-design/contracts/critique'; import { defaultCritiqueConfig, type CritiqueConfig } from '@open-design/contracts/critique'; // --------------------------------------------------------------------------- // DB fixture // --------------------------------------------------------------------------- function freshDb(): Database.Database { const db = new Database(':memory:'); db.pragma('journal_mode = WAL'); db.pragma('foreign_keys = ON'); db.exec(` CREATE TABLE projects ( id TEXT PRIMARY KEY, name TEXT NOT NULL, created_at INTEGER NOT NULL, updated_at INTEGER NOT NULL ); CREATE TABLE conversations ( id TEXT PRIMARY KEY, project_id TEXT NOT NULL, created_at INTEGER NOT NULL, updated_at INTEGER NOT NULL, FOREIGN KEY(project_id) REFERENCES projects(id) ON DELETE CASCADE ); INSERT INTO projects (id, name, created_at, updated_at) VALUES ('p1', 'p1', 0, 0); INSERT INTO conversations (id, project_id, created_at, updated_at) VALUES ('c1', 'p1', 0, 0); `); migrateCritique(db); return db; } // --------------------------------------------------------------------------- // Helpers // --------------------------------------------------------------------------- function makeBus(): { bus: CritiqueSseBus; events: CritiqueSseEvent[] } { const events: CritiqueSseEvent[] = []; const bus: CritiqueSseBus = { emit: (e) => { events.push(e); } }; return { bus, events }; } /** * Builds a minimal 3-round happy-path wire protocol stream. Uses a threshold * low enough (1.0) so every round passes, meaning SHIP with status=shipped. */ function happyStream3Rounds(): string { return ` Design intent v1. ]]> Good layout. Strong brand. Passes AA. Clear copy. Composite 9.0 but continuing per test. Design intent v2. Better. Consistent. Still passes. Still clear. Continuing to round 3. Design intent v3. Excellent. Perfect. Excellent. Great. Threshold met. final]]> Design converged in 3 rounds. `; } async function* streamOf(text: string, chunkSize = 64): AsyncIterable { for (let i = 0; i < text.length; i += chunkSize) { yield text.slice(i, i + chunkSize); } } // --------------------------------------------------------------------------- // Setup / Teardown // --------------------------------------------------------------------------- let tmpDir: string; let db: Database.Database; beforeEach(() => { tmpDir = mkdtempSync(join(tmpdir(), 'od-orch-test-')); db = freshDb(); }); afterEach(async () => { db.close(); await rm(tmpDir, { recursive: true, force: true }); }); // --------------------------------------------------------------------------- // Happy path // --------------------------------------------------------------------------- describe('runOrchestrator - happy path', () => { it('3-round shipped run: row reflects shipped + composite + rounds + transcript path', async () => { const { bus, events } = makeBus(); const artifactDir = join(tmpDir, 'run1'); const cfg = defaultCritiqueConfig(); const result = await runOrchestrator({ runId: 'r1', projectId: 'p1', conversationId: 'c1', artifactId: 'a1', artifactDir, adapter: 'claude', cfg, db, bus, stdout: streamOf(happyStream3Rounds()), }); expect(result.status).toBe('shipped'); expect(result.composite).toBeCloseTo(9.45, 1); expect(result.rounds).toHaveLength(3); expect(result.transcriptPath).toBeTruthy(); const row = getCritiqueRun(db, 'r1'); expect(row?.status).toBe('shipped'); expect(row?.rounds).toHaveLength(3); expect(row?.transcriptPath).toBeTruthy(); // Transcript file exists on disk. const transcriptFile = join(artifactDir, result.transcriptPath!); expect(existsSync(transcriptFile)).toBe(true); // SSE events emitted: should include run_started and ship. const eventNames = events.map((e) => e.event); expect(eventNames).toContain('critique.run_started'); expect(eventNames).toContain('critique.ship'); }); it('SSE events are emitted in source order', async () => { const { bus, events } = makeBus(); const artifactDir = join(tmpDir, 'run-order'); await runOrchestrator({ runId: 'r-order', projectId: 'p1', conversationId: null, artifactId: 'a1', artifactDir, adapter: 'claude', cfg: defaultCritiqueConfig(), db, bus, stdout: streamOf(happyStream3Rounds()), }); const names = events.map((e) => e.event); const runStartedIdx = names.indexOf('critique.run_started'); const shipIdx = names.lastIndexOf('critique.ship'); expect(runStartedIdx).toBe(0); expect(shipIdx).toBeGreaterThan(runStartedIdx); }); }); // --------------------------------------------------------------------------- // Malformed / degraded // --------------------------------------------------------------------------- describe('runOrchestrator - degraded', () => { it('malformed input: row is degraded, critique.degraded emitted, no transcript path in row (transcript may still be written)', async () => { const { bus, events } = makeBus(); const artifactDir = join(tmpDir, 'run-malformed'); // Malformed: ROUND before CRITIQUE_RUN. const malformedText = ``; const result = await runOrchestrator({ runId: 'r-malformed', projectId: 'p1', conversationId: null, artifactId: 'a1', artifactDir, adapter: 'claude', cfg: defaultCritiqueConfig(), db, bus, stdout: streamOf(malformedText), }); expect(result.status).toBe('degraded'); const row = getCritiqueRun(db, 'r-malformed'); expect(row?.status).toBe('degraded'); const degradedEvents = events.filter((e) => e.event === 'critique.degraded'); expect(degradedEvents).toHaveLength(1); }); }); // --------------------------------------------------------------------------- // Fallback policy // --------------------------------------------------------------------------- describe('runOrchestrator - fallback policy', () => { it('below threshold: stream ends without SHIP, ship_best selects highest composite', async () => { const { bus, events } = makeBus(); const artifactDir = join(tmpDir, 'run-below'); // 2 rounds but no SHIP - scores below threshold (default 8.0). const noShipText = ` v1 ]]> needs work Fix hierarchy ok ok ok Below threshold. v2 better ok ok ok Still below threshold. `; const cfg: CritiqueConfig = { ...defaultCritiqueConfig(), fallbackPolicy: 'ship_best' }; const result = await runOrchestrator({ runId: 'r-below', projectId: 'p1', conversationId: null, artifactId: 'a1', artifactDir, adapter: 'claude', cfg, db, bus, stdout: streamOf(noShipText), }); expect(result.status).toBe('below_threshold'); // ship_best should select round 2 (composite 7.0 > 6.0). expect(result.composite).toBeGreaterThan(6.0); const row = getCritiqueRun(db, 'r-below'); expect(row?.status).toBe('below_threshold'); const shipEvents = events.filter((e) => e.event === 'critique.ship'); expect(shipEvents).toHaveLength(1); }); it('fallback policy fail: row is failed, no synthetic ship event', async () => { const { bus, events } = makeBus(); const artifactDir = join(tmpDir, 'run-failpolicy'); const noShipText = ` v1 ]]> ok ok ok ok Below threshold. `; const cfg: CritiqueConfig = { ...defaultCritiqueConfig(), fallbackPolicy: 'fail' }; const result = await runOrchestrator({ runId: 'r-failpolicy', projectId: 'p1', conversationId: null, artifactId: 'a1', artifactDir, adapter: 'claude', cfg, db, bus, stdout: streamOf(noShipText), }); expect(result.status).toBe('failed'); const row = getCritiqueRun(db, 'r-failpolicy'); expect(row?.status).toBe('failed'); const shipEvents = events.filter((e) => e.event === 'critique.ship'); expect(shipEvents).toHaveLength(0); }); }); // --------------------------------------------------------------------------- // Timeout // --------------------------------------------------------------------------- describe('runOrchestrator - timeouts', () => { it('per-round timeout: stalled stream causes timed_out row', async () => { const { bus } = makeBus(); const artifactDir = join(tmpDir, 'run-round-timeout'); // Source that yields initial data then stalls past the per-round timeout. async function* stallingSource(): AsyncIterable { yield '\n'; yield ' \n'; yield ' \n'; yield ' v1\n'; yield ' ]]>\n'; yield ' \n'; // Stall: never send ROUND_END, timeout will fire. await new Promise((_, reject) => setTimeout(() => reject(new Error('stall')), 200)); } const cfg: CritiqueConfig = { ...defaultCritiqueConfig(), perRoundTimeoutMs: 50, totalTimeoutMs: 60_000, }; const result = await runOrchestrator({ runId: 'r-round-timeout', projectId: 'p1', conversationId: null, artifactId: 'a1', artifactDir, adapter: 'claude', cfg, db, bus, stdout: stallingSource(), }); expect(result.status).toBe('timed_out'); const row = getCritiqueRun(db, 'r-round-timeout'); expect(row?.status).toBe('timed_out'); }, 5000); it('total timeout: wall-clock deadline exceeded causes timed_out row', async () => { const { bus } = makeBus(); const artifactDir = join(tmpDir, 'run-total-timeout'); async function* slowSource(): AsyncIterable { yield '\n'; await new Promise((_, reject) => setTimeout(() => reject(new Error('total stall')), 200)); } const cfg: CritiqueConfig = { ...defaultCritiqueConfig(), perRoundTimeoutMs: 60_000, totalTimeoutMs: 50, }; const result = await runOrchestrator({ runId: 'r-total-timeout', projectId: 'p1', conversationId: null, artifactId: 'a1', artifactDir, adapter: 'claude', cfg, db, bus, stdout: slowSource(), }); expect(result.status).toBe('timed_out'); const row = getCritiqueRun(db, 'r-total-timeout'); expect(row?.status).toBe('timed_out'); }, 5000); }); // --------------------------------------------------------------------------- // Abort signal // --------------------------------------------------------------------------- describe('runOrchestrator - abort signal', () => { it('abort mid-run: row is interrupted, transcript captures events seen so far', async () => { const { bus, events } = makeBus(); const artifactDir = join(tmpDir, 'run-abort'); const controller = new AbortController(); async function* abortingSource(): AsyncIterable { yield '\n'; yield ' \n'; yield ' \n'; yield ' v1\n'; yield ' ]]>\n'; yield ' \n'; // Abort mid-stream. controller.abort(); yield ' \n'; } const result = await runOrchestrator({ runId: 'r-abort', projectId: 'p1', conversationId: null, artifactId: 'a1', artifactDir, adapter: 'claude', cfg: defaultCritiqueConfig(), db, bus, stdout: abortingSource(), signal: controller.signal, }); expect(result.status).toBe('interrupted'); const row = getCritiqueRun(db, 'r-abort'); expect(row?.status).toBe('interrupted'); // Transcript should exist with partial events. if (result.transcriptPath) { expect(existsSync(join(artifactDir, result.transcriptPath))).toBe(true); } const interruptedEvents = events.filter((e) => e.event === 'critique.interrupted'); expect(interruptedEvents).toHaveLength(1); }); }); // --------------------------------------------------------------------------- // Defensive entry validation // --------------------------------------------------------------------------- describe('runOrchestrator - defensive entry', () => { it('throws RangeError on invalid cfg (negative scoreThreshold) before any side effects', async () => { const { bus } = makeBus(); const cfg: CritiqueConfig = { ...defaultCritiqueConfig(), scoreThreshold: -1 }; await expect( runOrchestrator({ runId: 'r-invalid', projectId: 'p1', conversationId: null, artifactId: 'a1', artifactDir: join(tmpDir, 'run-invalid'), adapter: 'claude', cfg, db, bus, stdout: streamOf(''), }), ).rejects.toThrow(RangeError); // No row should have been inserted. expect(getCritiqueRun(db, 'r-invalid')).toBeNull(); }); it('throws RangeError on invalid cfg (zero perRoundTimeoutMs)', async () => { const { bus } = makeBus(); const cfg: CritiqueConfig = { ...defaultCritiqueConfig(), perRoundTimeoutMs: 0 }; await expect( runOrchestrator({ runId: 'r-invalid2', projectId: 'p1', conversationId: null, artifactId: 'a1', artifactDir: join(tmpDir, 'run-invalid2'), adapter: 'claude', cfg, db, bus, stdout: streamOf(''), }), ).rejects.toThrow(RangeError); expect(getCritiqueRun(db, 'r-invalid2')).toBeNull(); }); }); // --------------------------------------------------------------------------- // Child exit races (Defect 4) // --------------------------------------------------------------------------- describe('runOrchestrator - child exit race (Defect 4)', () => { it('child exits non-zero mid-stream: result is failed with cli_exit_nonzero', async () => { const { bus, events } = makeBus(); const artifactDir = join(tmpDir, 'run-child-exit'); // Stub child that exits with code 1 immediately. let killCalled = false; const stubChild = { kill: (_sig?: number | NodeJS.Signals) => { killCalled = true; return true as boolean; } }; // childExitPromise resolves with code=1 after a short delay. const childExitPromise = new Promise<{ code: number | null; signal: string | null }>( (resolve) => setTimeout(() => resolve({ code: 1, signal: null }), 20), ); // Stdout that emits the run header then stalls. // Uses a short delay (longer than childExitPromise's 20ms) so the child // exit race wins before the stall promise resolves, but the generator // itself does eventually resolve so iter.return() cleanup doesn't hang. async function* stallingStdout(): AsyncIterable { yield '\n'; yield ' \n'; // Stall for longer than the child exit delay (20ms) but eventually resolve // so the generator can be cleaned up by iter.return() in applyTimeouts. await new Promise((resolve) => setTimeout(resolve, 5000)); } const cfg: CritiqueConfig = { ...defaultCritiqueConfig(), // Long timeouts so only the child exit race wins; we don't want the // per-round or total timer to fire before childExitPromise resolves. perRoundTimeoutMs: 30_000, totalTimeoutMs: 30_000, }; const result = await runOrchestrator({ runId: 'r-child-exit', projectId: 'p1', conversationId: null, artifactId: 'a1', artifactDir, adapter: 'claude', cfg, db, bus, stdout: stallingStdout(), child: stubChild, childExitPromise, }); expect(result.status).toBe('failed'); const row = getCritiqueRun(db, 'r-child-exit'); expect(row?.status).toBe('failed'); expect(killCalled).toBe(true); const failedEvents = events.filter((e) => e.event === 'critique.failed'); expect(failedEvents).toHaveLength(1); }, 10000); it('child exits zero before parser completes: parser continues until stream ends', async () => { const { bus } = makeBus(); const artifactDir = join(tmpDir, 'run-child-exit-zero'); // Child exits with code 0 (zero is not an error). const childExitPromise = new Promise<{ code: number | null; signal: string | null }>( (resolve) => setTimeout(() => resolve({ code: 0, signal: null }), 10), ); // A complete valid 1-round stream that finishes after the child exit. async function* delayedStream(): AsyncIterable { await new Promise((r) => setTimeout(r, 30)); yield '\n'; yield ' \n'; yield ' v1v1

]]>
\n'; yield ' ok\n'; yield ' ok\n'; yield ' ok\n'; yield ' ok\n'; yield ' ok\n'; yield '
\n'; yield ' \n'; yield ' final

]]>
\n'; yield ' done\n'; yield '
\n'; yield '
\n'; } const result = await runOrchestrator({ runId: 'r-child-exit-zero', projectId: 'p1', conversationId: null, artifactId: 'a1', artifactDir, adapter: 'claude', cfg: defaultCritiqueConfig(), db, bus, stdout: delayedStream(), childExitPromise, }); // Zero exit does not disrupt the parser; it should complete as shipped. expect(result.status).toBe('shipped'); }, 10000); }); // --------------------------------------------------------------------------- // Timeout / abort best-so-far fallback (Defect 7) // --------------------------------------------------------------------------- describe('runOrchestrator - fallback on timeout/abort (Defect 7)', () => { it('timeout after 2 completed rounds: status=timed_out, score=max(composite)', async () => { const { bus, events } = makeBus(); const artifactDir = join(tmpDir, 'run-timeout-fallback'); // 2 complete rounds then stall. // Two complete rounds. After emitting both ROUND_END events, stall so the // total-timeout fires and the orchestrator elects a fallback round. const twoRounds = ` v1v1

]]>
ok ok ok ok continue
v2 better ok ok ok continue `; async function* stallingAfterTwoRounds(): AsyncIterable { yield* streamOf(twoRounds, 64); // After both rounds land, stall so the total-timeout fires. await new Promise((resolve) => setTimeout(resolve, 10_000)); } const cfg: CritiqueConfig = { ...defaultCritiqueConfig(), // Per-round timeout starts when the first panelist_open of a new round // fires. Set it to 200ms so it fires quickly once the stall begins. // Total timeout is long so only the per-round timer fires. // But we yield the stall after ROUND_END so no panelist_open is active. // Use total timeout of 300ms which fires after the stall begins. perRoundTimeoutMs: 60_000, totalTimeoutMs: 300, fallbackPolicy: 'ship_best', }; const result = await runOrchestrator({ runId: 'r-timeout-fallback', projectId: 'p1', conversationId: null, artifactId: 'a1', artifactDir, adapter: 'claude', cfg, db, bus, stdout: stallingAfterTwoRounds(), }); expect(result.status).toBe('timed_out'); // Best round is 2 with composite 7.5. expect(result.composite).toBeCloseTo(7.5, 1); const row = getCritiqueRun(db, 'r-timeout-fallback'); expect(row?.status).toBe('timed_out'); expect(row?.score).toBeCloseTo(7.5, 1); // A synthetic ship event should have been emitted. const shipEvents = events.filter((e) => e.event === 'critique.ship'); expect(shipEvents).toHaveLength(1); }, 15000); it('abort after 1 completed round: status=interrupted, score matches that round', async () => { const { bus, events } = makeBus(); const artifactDir = join(tmpDir, 'run-abort-fallback'); const controller = new AbortController(); const oneRound = ` v1v1

]]>
ok ok ok ok continue
`; async function* abortAfterRound(): AsyncIterable { yield* streamOf(oneRound, 64); controller.abort(); // One more yield after abort to ensure the abort is caught. yield ' \n'; } const cfg: CritiqueConfig = { ...defaultCritiqueConfig(), fallbackPolicy: 'ship_best', }; const result = await runOrchestrator({ runId: 'r-abort-fallback', projectId: 'p1', conversationId: null, artifactId: 'a1', artifactDir, adapter: 'claude', cfg, db, bus, stdout: abortAfterRound(), signal: controller.signal, }); expect(result.status).toBe('interrupted'); expect(result.composite).toBeCloseTo(8.0, 1); const row = getCritiqueRun(db, 'r-abort-fallback'); expect(row?.status).toBe('interrupted'); expect(row?.score).toBeCloseTo(8.0, 1); const shipEvents = events.filter((e) => e.event === 'critique.ship'); expect(shipEvents).toHaveLength(1); }); });