feat(preview): SSE-streamed generation, no CF 100s edge cap
All checks were successful
Deploy to Production / deploy (push) Successful in 1m27s

Architectural fix for "spec_too_large" / preview_timeout — the sync
endpoint had to fit the whole model run into Cloudflare's ~100s edge
window, which made the system fragile against any prompt that produced
a verbose spec. The new streaming path pipes Anthropic's token deltas
as Server-Sent Events; every chunk resets CF's idle timer and a 15s
keepalive comment guarantees activity even during slow first-token
windows.

@bmm/llm: new streamSpecFromAnthropic() exposes the SDK's .stream()
flow with the same typed-error contract as generateSpec — same
SpecTruncatedError / SpecValidationError / SpecTimeoutError raised from
the relevant moment.

API: POST /v1/servers/preview/stream returns text/event-stream with
events 'text' (deltas), 'spec' (final success payload, same shape as
the sync endpoint), 'error' (typed). Anthropic-only — GLM/hobby falls
back to the sync route via 409 streaming_unavailable.

Frontend: apiSseStream() handles the POST + ReadableStream + SSE
parser. The wizard's analyze() prefers the stream and only uses the
sync endpoint on the explicit 409 fallback.

nginx (api.buildmymcpserver.com): the /v1/builds/ location block (which
already had proxy_buffering off + 600s read timeout for the WS build
stream) now also matches /v1/servers/preview/stream so the SSE
response isn't buffered.
This commit is contained in:
Marco Sadjadi 2026-05-28 21:11:05 +02:00
parent b930a454e8
commit 0c6d738a6b
4 changed files with 385 additions and 12 deletions

View File

@ -19,6 +19,7 @@ import {
generateSpec,
pickPreviewModel,
scanForInjection,
streamSpecFromAnthropic,
} from '@bmm/llm';
import {
BuildEvent,
@ -174,6 +175,162 @@ export async function serverRoutes(app: FastifyInstance): Promise<void> {
}
});
// Streaming preview — pipes the model's text deltas back as Server-Sent
// Events so Cloudflare's ~100s edge cap is irrelevant: every chunk we
// write resets the idle timer. The final event is either `spec` (success)
// or `error` (any of the typed errors raised by the LLM layer).
//
// Anthropic-only for now: GLM doesn't ship a clean streaming JSON
// contract that justifies the duplication, and hobby's 4096-token budget
// already fits the sync path comfortably. The route falls back to the
// sync endpoint if the request comes from a non-Anthropic tier.
app.post('/v1/servers/preview/stream', { preHandler: requireAuth }, async (req, reply) => {
const user = req.user!;
const parsed = PreviewInput.safeParse(req.body);
if (!parsed.success) {
return reply.code(400).send({ error: 'invalid_input', issues: parsed.error.flatten() });
}
const billing = await getOrgBilling(user.orgId);
if (billing.suspended) {
return reply.code(402).send({
error: 'subscription_suspended',
detail:
billing.suspendedReason === 'payment_failed'
? 'Your subscription is paused due to a payment issue. Update your payment method in /settings/billing.'
: 'Your subscription is paused. Visit /settings/billing for details.',
suspendedReason: billing.suspendedReason,
});
}
const plan = billing.plan;
const rl = await checkDailyLimit('preview', user.userId, PREVIEW_DAILY_LIMIT[plan]);
if (!rl.ok) {
return reply.code(429).send({
error: 'rate_limited',
detail: `Daily preview limit reached for plan "${plan}" (${PREVIEW_DAILY_LIMIT[plan]}/day). Resets in ${Math.ceil(rl.resetIn / 3600)}h.`,
plan,
limit: PREVIEW_DAILY_LIMIT[plan],
resetIn: rl.resetIn,
});
}
const choice = pickPreviewModel(plan);
if (choice.provider !== 'anthropic' || !config.ANTHROPIC_API_KEY) {
return reply.code(409).send({
error: 'streaming_unavailable',
detail: 'Streaming preview is only available for Anthropic-backed tiers. Use POST /v1/servers/preview instead.',
});
}
// SSE response. X-Accel-Buffering disables nginx's response buffering
// so each chunk lands at the client immediately rather than after the
// full response is built — critical for the keepalive-vs-CF-100s logic
// to actually work.
reply.raw.setHeader('Content-Type', 'text/event-stream');
reply.raw.setHeader('Cache-Control', 'no-cache, no-transform');
reply.raw.setHeader('Connection', 'keep-alive');
reply.raw.setHeader('X-Accel-Buffering', 'no');
reply.raw.flushHeaders();
const send = (event: string, data: unknown) => {
reply.raw.write(`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`);
};
// Heartbeat comment every 15s. Cloudflare's edge keeps the connection
// open as long as bytes flow; comments are SSE-noop but count as bytes.
const keepalive = setInterval(() => reply.raw.write(`: ping\n\n`), 15_000);
const abort = new AbortController();
req.raw.on('close', () => abort.abort());
let resolved = false;
await streamSpecFromAnthropic(
parsed.data.prompt,
{
apiKey: config.ANTHROPIC_API_KEY,
model: choice.model,
maxTokens: choice.maxTokens,
signal: abort.signal,
},
{
onText: (delta) => send('text', delta),
onSpec: async ({ spec, source }) => {
resolved = true;
const previewId = await cacheSpec(spec);
send('spec', {
previewId,
source,
plan,
modelDisplayName: choice.displayName,
modelBadge: choice.displayBadge,
upgradeHint: plan === 'hobby',
spec: {
name: spec.name,
description: spec.description,
tools: spec.tools.map((t) => ({
name: t.name,
description: t.description,
inputSchema: t.inputSchema,
})),
requiredSecrets: spec.requiredSecrets,
scopes: spec.scopes,
},
});
},
onError: (err) => {
resolved = true;
if (err instanceof SpecTruncatedError) {
app.log.warn(
{
reason: err.message,
prompt: parsed.data.prompt.slice(0, 200),
model: choice.displayName,
},
'preview_spec_truncated',
);
send('error', {
error: 'spec_too_large',
detail:
'The spec for this prompt exceeded the maximum response size. Split it into fewer tools or describe one capability per prompt.',
});
return;
}
if (err instanceof SpecValidationError) {
app.log.warn(
{
zod_message: err.message,
prompt: parsed.data.prompt.slice(0, 200),
model: choice.displayName,
},
'preview_spec_invalid',
);
send('error', { error: 'spec_invalid', detail: err.message });
return;
}
if (err instanceof BannedPatternError) {
send('error', { error: 'banned_pattern', detail: err.message });
return;
}
if (err instanceof SpecTimeoutError) {
send('error', {
error: 'preview_timeout',
detail: 'Spec generation took too long. Try a shorter, more specific prompt.',
});
return;
}
app.log.error(err);
send('error', { error: 'preview_failed', detail: err.message });
},
},
);
if (!resolved) {
send('error', { error: 'preview_failed', detail: 'stream ended without a final event' });
}
clearInterval(keepalive);
reply.raw.end();
});
app.post('/v1/servers', { preHandler: requireAuth }, async (req, reply) => {
const user = req.user!;
const parsed = CreateServerInput.safeParse(req.body);

View File

@ -5,7 +5,7 @@ import { Input, Label, Textarea } from '@/components/input';
import { InstallSnippets } from '@/components/install-snippets';
import { StreamingLogs } from '@/components/streaming-logs';
import { Button } from '@/components/ui/button';
import { apiFetch } from '@/lib/api';
import { apiFetch, apiSseStream } from '@/lib/api';
import { Loader2, RotateCcw, X } from 'lucide-react';
import Link from 'next/link';
import { useRouter, useSearchParams } from 'next/navigation';
@ -241,19 +241,69 @@ function NewServerPageInner() {
return;
}
setStep('analyzing');
try {
const res = await apiFetch<PreviewResponse>('/v1/servers/preview', {
method: 'POST',
body: JSON.stringify({ prompt }),
});
setPreview(res);
setEditable(null); // will re-init via useEffect
// Streaming preview: pipes Anthropic's token deltas back as SSE. Cloudflare's
// ~100s edge cap doesn't bite because every chunk we receive resets the
// idle timer; the only practical limit is the model's own runtime. If the
// backend returns 409 streaming_unavailable (e.g. hobby/GLM tier), we fall
// back to the sync endpoint so the wizard still works there.
let finalResolved = false;
let sseError: { error?: string; detail?: string } | null = null;
let sseSpec: PreviewResponse | null = null;
await apiSseStream(
'/v1/servers/preview/stream',
{ prompt },
{
onEvent: (event, data) => {
if (event === 'spec') {
finalResolved = true;
sseSpec = data as PreviewResponse;
} else if (event === 'error') {
finalResolved = true;
sseError = data as { error?: string; detail?: string };
}
// 'text' deltas are ignored for now — the wizard already shows a
// spinner. We could surface partial JSON later if useful.
},
onError: (err) => {
sseError = { detail: err.message };
},
},
);
if (finalResolved && sseSpec) {
setPreview(sseSpec);
setEditable(null);
setStep('confirm');
} catch (e) {
const detail = (e as { detail?: { error?: string; detail?: string } }).detail;
setError(detail?.detail ?? detail?.error ?? (e as Error).message);
setStep('prompt');
return;
}
if (sseError && (sseError as { error?: string }).error === 'streaming_unavailable') {
// GLM / mock tier — fall back to sync.
try {
const res = await apiFetch<PreviewResponse>('/v1/servers/preview', {
method: 'POST',
body: JSON.stringify({ prompt }),
});
setPreview(res);
setEditable(null);
setStep('confirm');
return;
} catch (e) {
const detail = (e as { detail?: { error?: string; detail?: string } }).detail;
setError(detail?.detail ?? detail?.error ?? (e as Error).message);
setStep('prompt');
return;
}
}
setError(
(sseError as { detail?: string } | null)?.detail ??
(sseError as { error?: string } | null)?.error ??
'Spec generation failed.',
);
setStep('prompt');
}
function updateTool(i: number, patch: Partial<EditableTool>) {

View File

@ -33,3 +33,96 @@ export function apiWebSocketURL(path: string): string {
const wsBase = httpBase.replace(/^http/, 'ws');
return `${wsBase}${path}`;
}
export interface SseHandlers {
onEvent: (event: string, data: unknown) => void;
onError?: (err: Error) => void;
}
/**
* POST with SSE response. Used by the streaming preview where Cloudflare's
* sync edge timeout (~100s) is fatal every server-side chunk resets it,
* so as long as the model emits text we never hit the cap.
*
* Native EventSource doesn't support POST or auth-cookie credentials, hence
* the manual fetch + ReadableStream consumer. Caller controls aborting via
* the supplied AbortSignal (we attach it to the underlying fetch).
*/
export async function apiSseStream(
path: string,
body: unknown,
handlers: SseHandlers,
signal?: AbortSignal,
): Promise<void> {
let res: Response;
try {
res = await fetch(`${API_BASE}${path}`, {
method: 'POST',
credentials: 'include',
cache: 'no-store',
headers: {
'Content-Type': 'application/json',
Accept: 'text/event-stream',
},
body: JSON.stringify(body),
signal,
});
} catch (err) {
handlers.onError?.(err as Error);
return;
}
if (!res.ok) {
let detail: unknown = undefined;
try {
detail = await res.json();
} catch {}
const err = new Error(`api_error_${res.status}`);
(err as unknown as { detail?: unknown }).detail = detail;
(err as unknown as { status?: number }).status = res.status;
handlers.onError?.(err);
return;
}
const reader = res.body?.getReader();
if (!reader) {
handlers.onError?.(new Error('no_response_body'));
return;
}
const decoder = new TextDecoder();
let buffer = '';
try {
while (true) {
const { value, done } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// SSE events are separated by a blank line. Parse one event at a time.
let idx = buffer.indexOf('\n\n');
while (idx >= 0) {
const raw = buffer.slice(0, idx);
buffer = buffer.slice(idx + 2);
let event = 'message';
const dataLines: string[] = [];
for (const line of raw.split('\n')) {
if (line.startsWith(':')) continue; // SSE comment
if (line.startsWith('event:')) event = line.slice(6).trim();
else if (line.startsWith('data:')) dataLines.push(line.slice(5).trim());
}
if (dataLines.length > 0) {
const data = dataLines.join('\n');
let parsed: unknown = data;
try {
parsed = JSON.parse(data);
} catch {
// Treat as a raw string event.
}
handlers.onEvent(event, parsed);
}
idx = buffer.indexOf('\n\n');
}
}
} catch (err) {
handlers.onError?.(err as Error);
}
}

View File

@ -295,6 +295,79 @@ async function generateWithAnthropic(
return { spec: parsed.data, source: 'claude' };
}
// ──────────────────────────────────────────────────────────────────────────
// Streaming generation (Anthropic only)
// ──────────────────────────────────────────────────────────────────────────
export interface StreamHandlers {
/** Called for each text delta emitted by the model. */
onText: (text: string) => void;
/** Called once when the stream completes successfully with the final spec. */
onSpec: (result: GenerationResult) => void;
/** Called once on any terminal error (timeout, truncation, validation). */
onError: (err: Error) => void;
}
/**
* Stream a spec from Anthropic, piping text deltas to a handler and finally
* surfacing the parsed/validated spec or the relevant typed error.
*
* Why streaming for /preview: Cloudflare's edge timeout is ~100s on the Free
* tier and our previous sync call could approach that for ambitious prompts.
* With streaming the TCP connection writes bytes from the first model token,
* which keeps CF (and nginx) from cutting us off runtime is bounded only by
* the model itself and our own AbortController, not by CF.
*/
export async function streamSpecFromAnthropic(
prompt: string,
opts: { apiKey: string; model: string; maxTokens: number; signal?: AbortSignal },
handlers: StreamHandlers,
): Promise<void> {
const client = new Anthropic({ apiKey: opts.apiKey });
let accumulated = '';
try {
const stream = client.messages.stream({
model: opts.model,
max_tokens: opts.maxTokens,
system: SYSTEM_PROMPT,
messages: [{ role: 'user', content: prompt }],
});
if (opts.signal) {
opts.signal.addEventListener('abort', () => stream.abort(), { once: true });
}
stream.on('text', (delta) => {
accumulated += delta;
handlers.onText(delta);
});
const final = await stream.finalMessage();
if (final.stop_reason === 'max_tokens') {
throw new SpecTruncatedError(
`model hit max_tokens (${opts.maxTokens}) before finishing the spec`,
);
}
const json = extractJson(accumulated);
const parsed = GeneratorSpec.safeParse(json);
if (!parsed.success) {
const preview = accumulated.slice(0, 400).replace(/\s+/g, ' ');
throw new SpecValidationError(`${parsed.error.message} :: raw="${preview}"`);
}
scanForInjection(parsed.data);
handlers.onSpec({ spec: parsed.data, source: 'claude' });
} catch (err) {
if (err instanceof Anthropic.APIConnectionTimeoutError) {
handlers.onError(new SpecTimeoutError('spec generation exceeded the time budget'));
return;
}
handlers.onError(err instanceof Error ? err : new Error(String(err)));
}
}
const GLM_ENDPOINT = 'https://open.bigmodel.cn/api/paas/v4/chat/completions';
async function generateWithGlm(