diff --git a/apps/api/src/routes/servers.ts b/apps/api/src/routes/servers.ts index b746de3..1bee0b5 100644 --- a/apps/api/src/routes/servers.ts +++ b/apps/api/src/routes/servers.ts @@ -256,6 +256,12 @@ export async function serverRoutes(app: FastifyInstance): Promise { const abort = new AbortController(); req.raw.on('close', () => abort.abort()); + // `resolved` is set inside the awaited handlers below — by the time + // streamSpecFromAnthropic returns, exactly one of onSpec/onError will + // have completed (handlers are awaited inside the llm package), so the + // post-stream fallback `if (!resolved)` only fires if the stream truly + // ended without either handler running (which would be a programming + // bug, not a runtime path). let resolved = false; await streamSpecFromAnthropic( parsed.data.prompt, @@ -268,7 +274,6 @@ export async function serverRoutes(app: FastifyInstance): Promise { { onText: (delta) => send('text', delta), onSpec: async ({ spec, source }) => { - resolved = true; const previewId = await cacheSpec(spec); send('spec', { previewId, @@ -289,9 +294,18 @@ export async function serverRoutes(app: FastifyInstance): Promise { scopes: spec.scopes, }, }); + app.log.info( + { + previewId, + tools: spec.tools.length, + prompt: parsed.data.prompt.slice(0, 200), + model: choice.displayName, + }, + 'preview_spec_ready', + ); + resolved = true; }, onError: (err) => { - resolved = true; if (err instanceof SpecTruncatedError) { app.log.warn( { @@ -306,9 +320,7 @@ export async function serverRoutes(app: FastifyInstance): Promise { detail: 'The spec for this prompt exceeded the maximum response size. Split it into fewer tools or describe one capability per prompt.', }); - return; - } - if (err instanceof SpecValidationError) { + } else if (err instanceof SpecValidationError) { app.log.warn( { zod_message: err.message, @@ -318,26 +330,24 @@ export async function serverRoutes(app: FastifyInstance): Promise { 'preview_spec_invalid', ); send('error', { error: 'spec_invalid', detail: err.message }); - return; - } - if (err instanceof BannedPatternError) { + } else if (err instanceof BannedPatternError) { send('error', { error: 'banned_pattern', detail: err.message }); - return; - } - if (err instanceof SpecTimeoutError) { + } else if (err instanceof SpecTimeoutError) { send('error', { error: 'preview_timeout', detail: 'Spec generation took too long. Try a shorter, more specific prompt.', }); - return; + } else { + app.log.error(err); + send('error', { error: 'preview_failed', detail: err.message }); } - app.log.error(err); - send('error', { error: 'preview_failed', detail: err.message }); + resolved = true; }, }, ); if (!resolved) { + app.log.error({ prompt: parsed.data.prompt.slice(0, 200) }, 'preview_stream_unresolved'); send('error', { error: 'preview_failed', detail: 'stream ended without a final event' }); } clearInterval(keepalive); diff --git a/packages/llm/src/index.ts b/packages/llm/src/index.ts index c434672..da147b4 100644 --- a/packages/llm/src/index.ts +++ b/packages/llm/src/index.ts @@ -300,12 +300,22 @@ async function generateWithAnthropic( // ────────────────────────────────────────────────────────────────────────── export interface StreamHandlers { - /** Called for each text delta emitted by the model. */ + /** Called for each text delta emitted by the model. Sync — must not throw. */ onText: (text: string) => void; - /** Called once when the stream completes successfully with the final spec. */ - onSpec: (result: GenerationResult) => void; - /** Called once on any terminal error (timeout, truncation, validation). */ - onError: (err: Error) => void; + /** + * Called once when the stream completes successfully with the final spec. + * MAY return a Promise — the caller awaits it before considering the + * stream finished. This is critical for SSE callers that need to write + * a final event and end the response: returning a void instead of + * Promise would leak the response.end() call before the event + * is actually written, leaving the client with no terminal frame. + */ + onSpec: (result: GenerationResult) => Promise | void; + /** + * Called once on any terminal error (timeout, truncation, validation). + * Same async contract as onSpec. + */ + onError: (err: Error) => Promise | void; } /** @@ -358,13 +368,18 @@ export async function streamSpecFromAnthropic( throw new SpecValidationError(`${parsed.error.message} :: raw="${preview}"`); } scanForInjection(parsed.data); - handlers.onSpec({ spec: parsed.data, source: 'claude' }); + // AWAITED on purpose — the SSE caller writes the terminal 'spec' event + // inside this handler and we must not return (and thereby allow the + // caller to .end() the response) until that write has completed. + await handlers.onSpec({ spec: parsed.data, source: 'claude' }); } catch (err) { - if (err instanceof Anthropic.APIConnectionTimeoutError) { - handlers.onError(new SpecTimeoutError('spec generation exceeded the time budget')); - return; - } - handlers.onError(err instanceof Error ? err : new Error(String(err))); + const terminal = + err instanceof Anthropic.APIConnectionTimeoutError + ? new SpecTimeoutError('spec generation exceeded the time budget') + : err instanceof Error + ? err + : new Error(String(err)); + await handlers.onError(terminal); } }