From 0c6d738a6baeae89bbf6807432e27d17e0bb0b13 Mon Sep 17 00:00:00 2001 From: Marco Sadjadi Date: Thu, 28 May 2026 21:11:05 +0200 Subject: [PATCH] feat(preview): SSE-streamed generation, no CF 100s edge cap MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Architectural fix for "spec_too_large" / preview_timeout — the sync endpoint had to fit the whole model run into Cloudflare's ~100s edge window, which made the system fragile against any prompt that produced a verbose spec. The new streaming path pipes Anthropic's token deltas as Server-Sent Events; every chunk resets CF's idle timer and a 15s keepalive comment guarantees activity even during slow first-token windows. @bmm/llm: new streamSpecFromAnthropic() exposes the SDK's .stream() flow with the same typed-error contract as generateSpec — same SpecTruncatedError / SpecValidationError / SpecTimeoutError raised from the relevant moment. API: POST /v1/servers/preview/stream returns text/event-stream with events 'text' (deltas), 'spec' (final success payload, same shape as the sync endpoint), 'error' (typed). Anthropic-only — GLM/hobby falls back to the sync route via 409 streaming_unavailable. Frontend: apiSseStream() handles the POST + ReadableStream + SSE parser. The wizard's analyze() prefers the stream and only uses the sync endpoint on the explicit 409 fallback. nginx (api.buildmymcpserver.com): the /v1/builds/ location block (which already had proxy_buffering off + 600s read timeout for the WS build stream) now also matches /v1/servers/preview/stream so the SSE response isn't buffered. --- apps/api/src/routes/servers.ts | 157 ++++++++++++++++++ apps/web/app/(dashboard)/servers/new/page.tsx | 74 +++++++-- apps/web/lib/api.ts | 93 +++++++++++ packages/llm/src/index.ts | 73 ++++++++ 4 files changed, 385 insertions(+), 12 deletions(-) diff --git a/apps/api/src/routes/servers.ts b/apps/api/src/routes/servers.ts index a3a3b4a..fad546c 100644 --- a/apps/api/src/routes/servers.ts +++ b/apps/api/src/routes/servers.ts @@ -19,6 +19,7 @@ import { generateSpec, pickPreviewModel, scanForInjection, + streamSpecFromAnthropic, } from '@bmm/llm'; import { BuildEvent, @@ -174,6 +175,162 @@ export async function serverRoutes(app: FastifyInstance): Promise { } }); + // Streaming preview — pipes the model's text deltas back as Server-Sent + // Events so Cloudflare's ~100s edge cap is irrelevant: every chunk we + // write resets the idle timer. The final event is either `spec` (success) + // or `error` (any of the typed errors raised by the LLM layer). + // + // Anthropic-only for now: GLM doesn't ship a clean streaming JSON + // contract that justifies the duplication, and hobby's 4096-token budget + // already fits the sync path comfortably. The route falls back to the + // sync endpoint if the request comes from a non-Anthropic tier. + app.post('/v1/servers/preview/stream', { preHandler: requireAuth }, async (req, reply) => { + const user = req.user!; + const parsed = PreviewInput.safeParse(req.body); + if (!parsed.success) { + return reply.code(400).send({ error: 'invalid_input', issues: parsed.error.flatten() }); + } + + const billing = await getOrgBilling(user.orgId); + if (billing.suspended) { + return reply.code(402).send({ + error: 'subscription_suspended', + detail: + billing.suspendedReason === 'payment_failed' + ? 'Your subscription is paused due to a payment issue. Update your payment method in /settings/billing.' + : 'Your subscription is paused. Visit /settings/billing for details.', + suspendedReason: billing.suspendedReason, + }); + } + const plan = billing.plan; + + const rl = await checkDailyLimit('preview', user.userId, PREVIEW_DAILY_LIMIT[plan]); + if (!rl.ok) { + return reply.code(429).send({ + error: 'rate_limited', + detail: `Daily preview limit reached for plan "${plan}" (${PREVIEW_DAILY_LIMIT[plan]}/day). Resets in ${Math.ceil(rl.resetIn / 3600)}h.`, + plan, + limit: PREVIEW_DAILY_LIMIT[plan], + resetIn: rl.resetIn, + }); + } + + const choice = pickPreviewModel(plan); + if (choice.provider !== 'anthropic' || !config.ANTHROPIC_API_KEY) { + return reply.code(409).send({ + error: 'streaming_unavailable', + detail: 'Streaming preview is only available for Anthropic-backed tiers. Use POST /v1/servers/preview instead.', + }); + } + + // SSE response. X-Accel-Buffering disables nginx's response buffering + // so each chunk lands at the client immediately rather than after the + // full response is built — critical for the keepalive-vs-CF-100s logic + // to actually work. + reply.raw.setHeader('Content-Type', 'text/event-stream'); + reply.raw.setHeader('Cache-Control', 'no-cache, no-transform'); + reply.raw.setHeader('Connection', 'keep-alive'); + reply.raw.setHeader('X-Accel-Buffering', 'no'); + reply.raw.flushHeaders(); + + const send = (event: string, data: unknown) => { + reply.raw.write(`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`); + }; + + // Heartbeat comment every 15s. Cloudflare's edge keeps the connection + // open as long as bytes flow; comments are SSE-noop but count as bytes. + const keepalive = setInterval(() => reply.raw.write(`: ping\n\n`), 15_000); + const abort = new AbortController(); + req.raw.on('close', () => abort.abort()); + + let resolved = false; + await streamSpecFromAnthropic( + parsed.data.prompt, + { + apiKey: config.ANTHROPIC_API_KEY, + model: choice.model, + maxTokens: choice.maxTokens, + signal: abort.signal, + }, + { + onText: (delta) => send('text', delta), + onSpec: async ({ spec, source }) => { + resolved = true; + const previewId = await cacheSpec(spec); + send('spec', { + previewId, + source, + plan, + modelDisplayName: choice.displayName, + modelBadge: choice.displayBadge, + upgradeHint: plan === 'hobby', + spec: { + name: spec.name, + description: spec.description, + tools: spec.tools.map((t) => ({ + name: t.name, + description: t.description, + inputSchema: t.inputSchema, + })), + requiredSecrets: spec.requiredSecrets, + scopes: spec.scopes, + }, + }); + }, + onError: (err) => { + resolved = true; + if (err instanceof SpecTruncatedError) { + app.log.warn( + { + reason: err.message, + prompt: parsed.data.prompt.slice(0, 200), + model: choice.displayName, + }, + 'preview_spec_truncated', + ); + send('error', { + error: 'spec_too_large', + detail: + 'The spec for this prompt exceeded the maximum response size. Split it into fewer tools or describe one capability per prompt.', + }); + return; + } + if (err instanceof SpecValidationError) { + app.log.warn( + { + zod_message: err.message, + prompt: parsed.data.prompt.slice(0, 200), + model: choice.displayName, + }, + 'preview_spec_invalid', + ); + send('error', { error: 'spec_invalid', detail: err.message }); + return; + } + if (err instanceof BannedPatternError) { + send('error', { error: 'banned_pattern', detail: err.message }); + return; + } + if (err instanceof SpecTimeoutError) { + send('error', { + error: 'preview_timeout', + detail: 'Spec generation took too long. Try a shorter, more specific prompt.', + }); + return; + } + app.log.error(err); + send('error', { error: 'preview_failed', detail: err.message }); + }, + }, + ); + + if (!resolved) { + send('error', { error: 'preview_failed', detail: 'stream ended without a final event' }); + } + clearInterval(keepalive); + reply.raw.end(); + }); + app.post('/v1/servers', { preHandler: requireAuth }, async (req, reply) => { const user = req.user!; const parsed = CreateServerInput.safeParse(req.body); diff --git a/apps/web/app/(dashboard)/servers/new/page.tsx b/apps/web/app/(dashboard)/servers/new/page.tsx index 786dbec..7818f77 100644 --- a/apps/web/app/(dashboard)/servers/new/page.tsx +++ b/apps/web/app/(dashboard)/servers/new/page.tsx @@ -5,7 +5,7 @@ import { Input, Label, Textarea } from '@/components/input'; import { InstallSnippets } from '@/components/install-snippets'; import { StreamingLogs } from '@/components/streaming-logs'; import { Button } from '@/components/ui/button'; -import { apiFetch } from '@/lib/api'; +import { apiFetch, apiSseStream } from '@/lib/api'; import { Loader2, RotateCcw, X } from 'lucide-react'; import Link from 'next/link'; import { useRouter, useSearchParams } from 'next/navigation'; @@ -241,19 +241,69 @@ function NewServerPageInner() { return; } setStep('analyzing'); - try { - const res = await apiFetch('/v1/servers/preview', { - method: 'POST', - body: JSON.stringify({ prompt }), - }); - setPreview(res); - setEditable(null); // will re-init via useEffect + + // Streaming preview: pipes Anthropic's token deltas back as SSE. Cloudflare's + // ~100s edge cap doesn't bite because every chunk we receive resets the + // idle timer; the only practical limit is the model's own runtime. If the + // backend returns 409 streaming_unavailable (e.g. hobby/GLM tier), we fall + // back to the sync endpoint so the wizard still works there. + let finalResolved = false; + let sseError: { error?: string; detail?: string } | null = null; + let sseSpec: PreviewResponse | null = null; + + await apiSseStream( + '/v1/servers/preview/stream', + { prompt }, + { + onEvent: (event, data) => { + if (event === 'spec') { + finalResolved = true; + sseSpec = data as PreviewResponse; + } else if (event === 'error') { + finalResolved = true; + sseError = data as { error?: string; detail?: string }; + } + // 'text' deltas are ignored for now — the wizard already shows a + // spinner. We could surface partial JSON later if useful. + }, + onError: (err) => { + sseError = { detail: err.message }; + }, + }, + ); + + if (finalResolved && sseSpec) { + setPreview(sseSpec); + setEditable(null); setStep('confirm'); - } catch (e) { - const detail = (e as { detail?: { error?: string; detail?: string } }).detail; - setError(detail?.detail ?? detail?.error ?? (e as Error).message); - setStep('prompt'); + return; } + + if (sseError && (sseError as { error?: string }).error === 'streaming_unavailable') { + // GLM / mock tier — fall back to sync. + try { + const res = await apiFetch('/v1/servers/preview', { + method: 'POST', + body: JSON.stringify({ prompt }), + }); + setPreview(res); + setEditable(null); + setStep('confirm'); + return; + } catch (e) { + const detail = (e as { detail?: { error?: string; detail?: string } }).detail; + setError(detail?.detail ?? detail?.error ?? (e as Error).message); + setStep('prompt'); + return; + } + } + + setError( + (sseError as { detail?: string } | null)?.detail ?? + (sseError as { error?: string } | null)?.error ?? + 'Spec generation failed.', + ); + setStep('prompt'); } function updateTool(i: number, patch: Partial) { diff --git a/apps/web/lib/api.ts b/apps/web/lib/api.ts index 6558b87..21c5385 100644 --- a/apps/web/lib/api.ts +++ b/apps/web/lib/api.ts @@ -33,3 +33,96 @@ export function apiWebSocketURL(path: string): string { const wsBase = httpBase.replace(/^http/, 'ws'); return `${wsBase}${path}`; } + +export interface SseHandlers { + onEvent: (event: string, data: unknown) => void; + onError?: (err: Error) => void; +} + +/** + * POST with SSE response. Used by the streaming preview where Cloudflare's + * sync edge timeout (~100s) is fatal — every server-side chunk resets it, + * so as long as the model emits text we never hit the cap. + * + * Native EventSource doesn't support POST or auth-cookie credentials, hence + * the manual fetch + ReadableStream consumer. Caller controls aborting via + * the supplied AbortSignal (we attach it to the underlying fetch). + */ +export async function apiSseStream( + path: string, + body: unknown, + handlers: SseHandlers, + signal?: AbortSignal, +): Promise { + let res: Response; + try { + res = await fetch(`${API_BASE}${path}`, { + method: 'POST', + credentials: 'include', + cache: 'no-store', + headers: { + 'Content-Type': 'application/json', + Accept: 'text/event-stream', + }, + body: JSON.stringify(body), + signal, + }); + } catch (err) { + handlers.onError?.(err as Error); + return; + } + + if (!res.ok) { + let detail: unknown = undefined; + try { + detail = await res.json(); + } catch {} + const err = new Error(`api_error_${res.status}`); + (err as unknown as { detail?: unknown }).detail = detail; + (err as unknown as { status?: number }).status = res.status; + handlers.onError?.(err); + return; + } + + const reader = res.body?.getReader(); + if (!reader) { + handlers.onError?.(new Error('no_response_body')); + return; + } + + const decoder = new TextDecoder(); + let buffer = ''; + try { + while (true) { + const { value, done } = await reader.read(); + if (done) break; + buffer += decoder.decode(value, { stream: true }); + // SSE events are separated by a blank line. Parse one event at a time. + let idx = buffer.indexOf('\n\n'); + while (idx >= 0) { + const raw = buffer.slice(0, idx); + buffer = buffer.slice(idx + 2); + let event = 'message'; + const dataLines: string[] = []; + for (const line of raw.split('\n')) { + if (line.startsWith(':')) continue; // SSE comment + if (line.startsWith('event:')) event = line.slice(6).trim(); + else if (line.startsWith('data:')) dataLines.push(line.slice(5).trim()); + } + if (dataLines.length > 0) { + const data = dataLines.join('\n'); + let parsed: unknown = data; + try { + parsed = JSON.parse(data); + } catch { + // Treat as a raw string event. + } + handlers.onEvent(event, parsed); + } + idx = buffer.indexOf('\n\n'); + } + } + } catch (err) { + handlers.onError?.(err as Error); + } +} diff --git a/packages/llm/src/index.ts b/packages/llm/src/index.ts index 743b1f4..1c2c546 100644 --- a/packages/llm/src/index.ts +++ b/packages/llm/src/index.ts @@ -295,6 +295,79 @@ async function generateWithAnthropic( return { spec: parsed.data, source: 'claude' }; } +// ────────────────────────────────────────────────────────────────────────── +// Streaming generation (Anthropic only) +// ────────────────────────────────────────────────────────────────────────── + +export interface StreamHandlers { + /** Called for each text delta emitted by the model. */ + onText: (text: string) => void; + /** Called once when the stream completes successfully with the final spec. */ + onSpec: (result: GenerationResult) => void; + /** Called once on any terminal error (timeout, truncation, validation). */ + onError: (err: Error) => void; +} + +/** + * Stream a spec from Anthropic, piping text deltas to a handler and finally + * surfacing the parsed/validated spec or the relevant typed error. + * + * Why streaming for /preview: Cloudflare's edge timeout is ~100s on the Free + * tier and our previous sync call could approach that for ambitious prompts. + * With streaming the TCP connection writes bytes from the first model token, + * which keeps CF (and nginx) from cutting us off — runtime is bounded only by + * the model itself and our own AbortController, not by CF. + */ +export async function streamSpecFromAnthropic( + prompt: string, + opts: { apiKey: string; model: string; maxTokens: number; signal?: AbortSignal }, + handlers: StreamHandlers, +): Promise { + const client = new Anthropic({ apiKey: opts.apiKey }); + let accumulated = ''; + + try { + const stream = client.messages.stream({ + model: opts.model, + max_tokens: opts.maxTokens, + system: SYSTEM_PROMPT, + messages: [{ role: 'user', content: prompt }], + }); + + if (opts.signal) { + opts.signal.addEventListener('abort', () => stream.abort(), { once: true }); + } + + stream.on('text', (delta) => { + accumulated += delta; + handlers.onText(delta); + }); + + const final = await stream.finalMessage(); + + if (final.stop_reason === 'max_tokens') { + throw new SpecTruncatedError( + `model hit max_tokens (${opts.maxTokens}) before finishing the spec`, + ); + } + + const json = extractJson(accumulated); + const parsed = GeneratorSpec.safeParse(json); + if (!parsed.success) { + const preview = accumulated.slice(0, 400).replace(/\s+/g, ' '); + throw new SpecValidationError(`${parsed.error.message} :: raw="${preview}"`); + } + scanForInjection(parsed.data); + handlers.onSpec({ spec: parsed.data, source: 'claude' }); + } catch (err) { + if (err instanceof Anthropic.APIConnectionTimeoutError) { + handlers.onError(new SpecTimeoutError('spec generation exceeded the time budget')); + return; + } + handlers.onError(err instanceof Error ? err : new Error(String(err))); + } +} + const GLM_ENDPOINT = 'https://open.bigmodel.cn/api/paas/v4/chat/completions'; async function generateWithGlm(