fix(preview/stream): await onSpec/onError handlers
All checks were successful
Deploy to Production / deploy (push) Successful in 1m21s

The llm package called the user-supplied onSpec/onError handlers
without awaiting them. In the /preview/stream route onSpec is async
(it does `await cacheSpec(...)` then writes the SSE `spec` event), so
the api handler's `await streamSpecFromAnthropic(...)` returned BEFORE
the terminal event had been written. The route's finally block then
ran `reply.raw.end()`, the queued `send('spec', ...)` hit a closed
stream and silently no-op'd, and the browser saw zero terminal
events — frontend ran into the "Spec generation failed." fallback
even though Anthropic had delivered a perfectly valid spec.

Verified against prod log: req-8 ran 66s with 200 and produced no
preview_spec_* log line, which is exactly the success-but-event-lost
signature.

Fix:
- StreamHandlers.onSpec / onError typed as Promise<void> | void
- Both call sites in streamSpecFromAnthropic now `await` them
- /preview/stream sets `resolved = true` at the END of each handler
  (after the SSE write completes) so the post-stream "unresolved"
  fallback only fires on a genuine programming bug
- Added preview_spec_ready info log on the happy path so future
  diagnosis doesn't have to infer success from the absence of error
  logs
This commit is contained in:
Marco Sadjadi 2026-05-28 22:00:03 +02:00
parent 29e699dc74
commit 092290bb38
2 changed files with 50 additions and 25 deletions

View File

@ -256,6 +256,12 @@ export async function serverRoutes(app: FastifyInstance): Promise<void> {
const abort = new AbortController(); const abort = new AbortController();
req.raw.on('close', () => abort.abort()); req.raw.on('close', () => abort.abort());
// `resolved` is set inside the awaited handlers below — by the time
// streamSpecFromAnthropic returns, exactly one of onSpec/onError will
// have completed (handlers are awaited inside the llm package), so the
// post-stream fallback `if (!resolved)` only fires if the stream truly
// ended without either handler running (which would be a programming
// bug, not a runtime path).
let resolved = false; let resolved = false;
await streamSpecFromAnthropic( await streamSpecFromAnthropic(
parsed.data.prompt, parsed.data.prompt,
@ -268,7 +274,6 @@ export async function serverRoutes(app: FastifyInstance): Promise<void> {
{ {
onText: (delta) => send('text', delta), onText: (delta) => send('text', delta),
onSpec: async ({ spec, source }) => { onSpec: async ({ spec, source }) => {
resolved = true;
const previewId = await cacheSpec(spec); const previewId = await cacheSpec(spec);
send('spec', { send('spec', {
previewId, previewId,
@ -289,9 +294,18 @@ export async function serverRoutes(app: FastifyInstance): Promise<void> {
scopes: spec.scopes, scopes: spec.scopes,
}, },
}); });
app.log.info(
{
previewId,
tools: spec.tools.length,
prompt: parsed.data.prompt.slice(0, 200),
model: choice.displayName,
},
'preview_spec_ready',
);
resolved = true;
}, },
onError: (err) => { onError: (err) => {
resolved = true;
if (err instanceof SpecTruncatedError) { if (err instanceof SpecTruncatedError) {
app.log.warn( app.log.warn(
{ {
@ -306,9 +320,7 @@ export async function serverRoutes(app: FastifyInstance): Promise<void> {
detail: detail:
'The spec for this prompt exceeded the maximum response size. Split it into fewer tools or describe one capability per prompt.', 'The spec for this prompt exceeded the maximum response size. Split it into fewer tools or describe one capability per prompt.',
}); });
return; } else if (err instanceof SpecValidationError) {
}
if (err instanceof SpecValidationError) {
app.log.warn( app.log.warn(
{ {
zod_message: err.message, zod_message: err.message,
@ -318,26 +330,24 @@ export async function serverRoutes(app: FastifyInstance): Promise<void> {
'preview_spec_invalid', 'preview_spec_invalid',
); );
send('error', { error: 'spec_invalid', detail: err.message }); send('error', { error: 'spec_invalid', detail: err.message });
return; } else if (err instanceof BannedPatternError) {
}
if (err instanceof BannedPatternError) {
send('error', { error: 'banned_pattern', detail: err.message }); send('error', { error: 'banned_pattern', detail: err.message });
return; } else if (err instanceof SpecTimeoutError) {
}
if (err instanceof SpecTimeoutError) {
send('error', { send('error', {
error: 'preview_timeout', error: 'preview_timeout',
detail: 'Spec generation took too long. Try a shorter, more specific prompt.', detail: 'Spec generation took too long. Try a shorter, more specific prompt.',
}); });
return; } else {
app.log.error(err);
send('error', { error: 'preview_failed', detail: err.message });
} }
app.log.error(err); resolved = true;
send('error', { error: 'preview_failed', detail: err.message });
}, },
}, },
); );
if (!resolved) { if (!resolved) {
app.log.error({ prompt: parsed.data.prompt.slice(0, 200) }, 'preview_stream_unresolved');
send('error', { error: 'preview_failed', detail: 'stream ended without a final event' }); send('error', { error: 'preview_failed', detail: 'stream ended without a final event' });
} }
clearInterval(keepalive); clearInterval(keepalive);

View File

@ -300,12 +300,22 @@ async function generateWithAnthropic(
// ────────────────────────────────────────────────────────────────────────── // ──────────────────────────────────────────────────────────────────────────
export interface StreamHandlers { export interface StreamHandlers {
/** Called for each text delta emitted by the model. */ /** Called for each text delta emitted by the model. Sync — must not throw. */
onText: (text: string) => void; onText: (text: string) => void;
/** Called once when the stream completes successfully with the final spec. */ /**
onSpec: (result: GenerationResult) => void; * Called once when the stream completes successfully with the final spec.
/** Called once on any terminal error (timeout, truncation, validation). */ * MAY return a Promise the caller awaits it before considering the
onError: (err: Error) => void; * stream finished. This is critical for SSE callers that need to write
* a final event and end the response: returning a void instead of
* Promise<void> would leak the response.end() call before the event
* is actually written, leaving the client with no terminal frame.
*/
onSpec: (result: GenerationResult) => Promise<void> | void;
/**
* Called once on any terminal error (timeout, truncation, validation).
* Same async contract as onSpec.
*/
onError: (err: Error) => Promise<void> | void;
} }
/** /**
@ -358,13 +368,18 @@ export async function streamSpecFromAnthropic(
throw new SpecValidationError(`${parsed.error.message} :: raw="${preview}"`); throw new SpecValidationError(`${parsed.error.message} :: raw="${preview}"`);
} }
scanForInjection(parsed.data); scanForInjection(parsed.data);
handlers.onSpec({ spec: parsed.data, source: 'claude' }); // AWAITED on purpose — the SSE caller writes the terminal 'spec' event
// inside this handler and we must not return (and thereby allow the
// caller to .end() the response) until that write has completed.
await handlers.onSpec({ spec: parsed.data, source: 'claude' });
} catch (err) { } catch (err) {
if (err instanceof Anthropic.APIConnectionTimeoutError) { const terminal =
handlers.onError(new SpecTimeoutError('spec generation exceeded the time budget')); err instanceof Anthropic.APIConnectionTimeoutError
return; ? new SpecTimeoutError('spec generation exceeded the time budget')
} : err instanceof Error
handlers.onError(err instanceof Error ? err : new Error(String(err))); ? err
: new Error(String(err));
await handlers.onError(terminal);
} }
} }