feat(generator): BullMQ worker (Claude API + spec render + docker build + local deploy)

This commit is contained in:
Marco Sadjadi 2026-05-19 00:26:53 +02:00
parent 9658e843df
commit cc24dd4a63
10 changed files with 688 additions and 0 deletions

View File

@ -0,0 +1,26 @@
{
"name": "@bmm/generator",
"version": "0.1.0",
"type": "module",
"private": true,
"scripts": {
"dev": "tsx watch src/index.ts",
"start": "node dist/index.js",
"build": "tsc -p tsconfig.json",
"typecheck": "tsc --noEmit"
},
"dependencies": {
"@anthropic-ai/sdk": "0.32.1",
"@bmm/db": "workspace:*",
"@bmm/types": "workspace:*",
"bullmq": "5.34.5",
"drizzle-orm": "0.36.4",
"ioredis": "5.4.1",
"zod": "3.23.8"
},
"devDependencies": {
"@types/node": "22.10.2",
"tsx": "4.19.2",
"typescript": "5.7.2"
}
}

View File

@ -0,0 +1,16 @@
import { z } from 'zod';
const Env = z.object({
DATABASE_URL: z.string(),
REDIS_URL: z.string().default('redis://localhost:6379'),
ANTHROPIC_API_KEY: z.string().optional(),
RUNNER_HOST: z.string().default('localhost'),
RUNNER_PORT_RANGE_START: z.coerce.number().default(4100),
RUNNER_PORT_RANGE_END: z.coerce.number().default(4999),
CONTROL_PLANE_URL: z.string().default('http://host.docker.internal:4000'),
CONTROL_PLANE_PUBLIC_URL: z.string().default('http://localhost:4000'),
MODEL_GENERATE: z.string().default('claude-opus-4-7'),
MODEL_FIX: z.string().default('claude-haiku-4-5-20251001'),
});
export const config = Env.parse(process.env);

View File

@ -0,0 +1,2 @@
import './worker.js';
console.log('[generator] booted');

View File

@ -0,0 +1,95 @@
import fs from 'node:fs/promises';
import path from 'node:path';
import { fileURLToPath } from 'node:url';
import { spawn } from 'node:child_process';
import type { GeneratorSpec } from '@bmm/types';
const here = path.dirname(fileURLToPath(import.meta.url));
// apps/generator/src/lib -> repo root
const REPO_ROOT = path.resolve(here, '..', '..', '..', '..');
const RUNNER_TEMPLATE_DIR = path.resolve(REPO_ROOT, 'apps', 'runner-template');
const BUILD_CONTEXT_ROOT = path.resolve(REPO_ROOT, 'build-context');
export interface BuildContext {
contextDir: string;
imageTag: string;
}
export async function prepareBuildContext(
serverId: string,
version: number,
slug: string,
generatedCode: string,
spec: GeneratorSpec,
): Promise<BuildContext> {
const contextDir = path.join(BUILD_CONTEXT_ROOT, `${slug}-v${version}-${serverId.slice(0, 8)}`);
await fs.rm(contextDir, { recursive: true, force: true });
await fs.mkdir(contextDir, { recursive: true });
// Copy runner-template files into context
await copyDir(RUNNER_TEMPLATE_DIR, contextDir);
// Overwrite the server.ts with generated code
await fs.mkdir(path.join(contextDir, 'src'), { recursive: true });
await fs.writeFile(path.join(contextDir, 'src', 'server.ts'), generatedCode, 'utf8');
// Merge generated deps into package.json
const pkgPath = path.join(contextDir, 'package.json');
const pkg = JSON.parse(await fs.readFile(pkgPath, 'utf8'));
pkg.dependencies = { ...pkg.dependencies, ...spec.dependencies };
await fs.writeFile(pkgPath, `${JSON.stringify(pkg, null, 2)}\n`, 'utf8');
const imageTag = `bmm-mcp-${slug}:v${version}`;
return { contextDir, imageTag };
}
async function copyDir(src: string, dest: string): Promise<void> {
const entries = await fs.readdir(src, { withFileTypes: true });
for (const entry of entries) {
if (entry.name === 'node_modules' || entry.name === 'dist') continue;
const s = path.join(src, entry.name);
const d = path.join(dest, entry.name);
if (entry.isDirectory()) {
await fs.mkdir(d, { recursive: true });
await copyDir(s, d);
} else {
await fs.copyFile(s, d);
}
}
}
export async function staticCheck(contextDir: string): Promise<void> {
// Skip in CI without a tsc binary; we trust the renderer + spec validation in dev.
// A future iteration will run `pnpm --filter ./build-context/... typecheck` here.
const code = await fs.readFile(path.join(contextDir, 'src', 'server.ts'), 'utf8');
if (code.includes('eval(') || code.includes('new Function(')) {
throw new Error('static_check_banned_token');
}
if (!code.includes("StreamableHTTPServerTransport") || !code.includes("McpServer")) {
throw new Error('static_check_missing_mcp_primitives');
}
}
export async function dockerBuild(contextDir: string, imageTag: string, onLog: (msg: string) => void): Promise<void> {
await new Promise<void>((resolve, reject) => {
const child = spawn('docker', ['build', '-t', imageTag, '.'], {
cwd: contextDir,
stdio: ['ignore', 'pipe', 'pipe'],
});
child.stdout.on('data', (d) => {
for (const line of d.toString().split(/\r?\n/)) {
if (line.trim()) onLog(line.trim());
}
});
child.stderr.on('data', (d) => {
for (const line of d.toString().split(/\r?\n/)) {
if (line.trim()) onLog(line.trim());
}
});
child.on('error', (e) => reject(e));
child.on('close', (code) => {
if (code === 0) resolve();
else reject(new Error(`docker_build_failed (exit ${code})`));
});
});
}

View File

@ -0,0 +1,127 @@
import Anthropic from '@anthropic-ai/sdk';
import { GeneratorSpec, type GeneratorSpec as GeneratorSpecT } from '@bmm/types';
import { config } from '../config.js';
const SYSTEM_PROMPT = `You generate production-grade MCP server specifications as STRICT JSON.
Output ONE JSON object (no markdown, no prose, no code fences) with this exact shape:
{
"name": "human-readable server name (max 128 chars)",
"description": "1-2 sentence purpose",
"tools": [
{
"name": "snake_case_tool_name",
"description": "what the AI client sees — single sentence, clear",
"inputSchema": {
"param_name": { "type": "string|number|boolean|array|object", "description": "...", "required": true }
},
"implementation": "ASYNC TypeScript body. Receives {args} pre-validated. Must return MCP content blocks: { content: [{ type: 'text', text: '...' }] }. Use process.env.SECRET_NAME for secrets. NEVER use eval/Function/child_process. Use globalThis.fetch for HTTP. Wrap external calls in try/catch and return { content: [{ type: 'text', text: 'Error: ...' }], isError: true } on failure."
}
],
"resources": [],
"prompts": [],
"requiredSecrets": ["UPPER_SNAKE_CASE"],
"scopes": ["mcp:read"],
"dependencies": {}
}
Rules:
- Tools are idempotent unless the description explicitly says destructive.
- Validate all string inputs before use.
- For databases: parameterized queries only (use the 'pg' library with $1 placeholders).
- For HTTP APIs: globalThis.fetch with explicit timeout via AbortSignal.timeout(10000).
- Never hardcode credentials; declare them under requiredSecrets and read via process.env.
- Keep tool implementations under 5000 characters.
- Do not include "import" statements in implementations the runtime injects fetch, pg, etc.
Return JSON only. No explanation.`;
export interface GenerationResult {
spec: GeneratorSpecT;
source: 'claude' | 'mock';
}
export async function generateSpec(prompt: string): Promise<GenerationResult> {
if (!config.ANTHROPIC_API_KEY) {
return { spec: mockSpec(prompt), source: 'mock' };
}
const client = new Anthropic({ apiKey: config.ANTHROPIC_API_KEY });
const response = await client.messages.create({
model: config.MODEL_GENERATE,
max_tokens: 8192,
system: SYSTEM_PROMPT,
messages: [{ role: 'user', content: prompt }],
});
const text = response.content
.filter((b): b is { type: 'text'; text: string } => b.type === 'text')
.map((b) => b.text)
.join('');
const json = extractJson(text);
const parsed = GeneratorSpec.safeParse(json);
if (!parsed.success) {
throw new Error(`spec_validation_failed: ${parsed.error.message}`);
}
scanForInjection(parsed.data);
return { spec: parsed.data, source: 'claude' };
}
function extractJson(text: string): unknown {
const trimmed = text.trim();
// strip ```json fences if present
const fenced = trimmed.match(/```(?:json)?\s*([\s\S]*?)```/);
const body = fenced ? fenced[1] : trimmed;
if (!body) throw new Error('empty_generation_output');
try {
return JSON.parse(body);
} catch (e) {
throw new Error(`generation_not_json: ${(e as Error).message}`);
}
}
const BANNED_PATTERNS = [
/\beval\s*\(/,
/\bnew\s+Function\s*\(/,
/\brequire\s*\(\s*['"]child_process['"]/,
/\bchild_process\b/,
/ignore\s+previous\s+instructions/i,
/disregard\s+(the\s+)?(above|previous)/i,
];
function scanForInjection(spec: GeneratorSpecT): void {
for (const tool of spec.tools) {
for (const pattern of BANNED_PATTERNS) {
if (pattern.test(tool.implementation) || pattern.test(tool.description)) {
throw new Error(`banned_pattern_detected: ${pattern.source}`);
}
}
}
}
function mockSpec(prompt: string): GeneratorSpecT {
return {
name: 'Echo MCP',
description: `Mock server (no ANTHROPIC_API_KEY). Prompt was: ${prompt.slice(0, 200)}`,
tools: [
{
name: 'echo',
description: 'Echoes the input string back to the caller.',
inputSchema: {
message: { type: 'string', description: 'Message to echo back', required: true },
},
implementation: `const msg = String(args.message ?? '');\nreturn { content: [{ type: 'text', text: \`echo: \${msg}\` }] };`,
},
{
name: 'now',
description: 'Returns the current server UTC timestamp.',
inputSchema: {},
implementation: `return { content: [{ type: 'text', text: new Date().toISOString() }] };`,
},
],
resources: [],
prompts: [],
requiredSecrets: [],
scopes: ['mcp:read'],
dependencies: {},
};
}

View File

@ -0,0 +1,123 @@
import net from 'node:net';
import { createDb, eq, isNotNull, mcpServers } from '@bmm/db';
import { config } from '../config.js';
const db = createDb();
async function portFree(port: number, host = '127.0.0.1'): Promise<boolean> {
return new Promise((resolve) => {
const tester = net
.createServer()
.once('error', () => resolve(false))
.once('listening', () => tester.close(() => resolve(true)))
.listen(port, host);
});
}
export async function allocatePort(): Promise<number> {
const used = new Set(
(
await db
.select({ port: mcpServers.hostPort })
.from(mcpServers)
.where(isNotNull(mcpServers.hostPort))
)
.map((r) => r.port)
.filter((p): p is number => typeof p === 'number'),
);
for (let port = config.RUNNER_PORT_RANGE_START; port <= config.RUNNER_PORT_RANGE_END; port++) {
if (used.has(port)) continue;
if (await portFree(port)) return port;
}
throw new Error('no_free_port');
}
export interface DeployHandle {
containerId: string;
publicUrl: string;
hostPort: number;
}
export interface DeployInput {
serverId: string;
slug: string;
hostPort: number;
imageTag: string;
envVars: Record<string, string>;
}
// Production-only flags documented but unused in dev for Windows Docker Desktop compat:
// '--read-only',
// '--cap-drop=ALL',
// '--security-opt=no-new-privileges',
// '--cpus=0.5',
// '--memory=512m',
export async function deployContainer(input: DeployInput): Promise<DeployHandle> {
// In a future iteration this calls docker engine API directly via UNIX socket / named pipe.
// For Sprint 1-3 we shell out via the bound docker CLI which is portable on win/mac/linux.
const { spawn } = await import('node:child_process');
const containerName = `bmm-mcp-${input.slug}-${Date.now().toString(36)}`;
const args = [
'run',
'-d',
'--name',
containerName,
'-p',
`${input.hostPort}:3000`,
];
for (const [k, v] of Object.entries(input.envVars)) {
args.push('-e', `${k}=${v}`);
}
args.push('--restart=unless-stopped', input.imageTag);
return await new Promise<DeployHandle>((resolve, reject) => {
const child = spawn('docker', args, { stdio: ['ignore', 'pipe', 'pipe'] });
let out = '';
let err = '';
child.stdout.on('data', (d) => {
out += d.toString();
});
child.stderr.on('data', (d) => {
err += d.toString();
});
child.on('error', (e) => reject(e));
child.on('close', async (code) => {
if (code !== 0) {
reject(new Error(`docker_run_failed (exit ${code}): ${err.trim() || out.trim()}`));
return;
}
const containerId = out.trim().slice(0, 64);
const publicUrl = `http://${config.RUNNER_HOST}:${input.hostPort}`;
await db
.update(mcpServers)
.set({
containerId,
hostPort: input.hostPort,
publicUrl,
status: 'live',
updatedAt: new Date(),
})
.where(eq(mcpServers.id, input.serverId));
resolve({ containerId, publicUrl, hostPort: input.hostPort });
});
});
}
export async function stopContainer(containerId: string): Promise<void> {
const { spawn } = await import('node:child_process');
await new Promise<void>((resolve) => {
const child = spawn('docker', ['rm', '-f', containerId], { stdio: 'ignore' });
child.on('close', () => resolve());
child.on('error', () => resolve());
});
}
export async function dockerAvailable(): Promise<boolean> {
const { spawn } = await import('node:child_process');
return await new Promise<boolean>((resolve) => {
const child = spawn('docker', ['version'], { stdio: 'ignore' });
child.on('error', () => resolve(false));
child.on('close', (code) => resolve(code === 0));
});
}

View File

@ -0,0 +1,46 @@
import { Redis } from 'ioredis';
import { createDb, buildLogs } from '@bmm/db';
import type { BuildEvent, BuildStatus } from '@bmm/types';
import { config } from '../config.js';
const pub = new Redis(config.REDIS_URL, { maxRetriesPerRequest: null });
const db = createDb();
function channel(buildId: string): string {
return `build:${buildId}`;
}
export async function emitLog(
buildId: string,
level: 'info' | 'warn' | 'error',
message: string,
): Promise<void> {
const at = new Date().toISOString();
const evt: BuildEvent = { type: 'log', level, message, at };
await pub.publish(channel(buildId), JSON.stringify(evt));
await db.insert(buildLogs).values({ buildId, level, message });
}
export async function emitStatus(buildId: string, status: BuildStatus): Promise<void> {
const at = new Date().toISOString();
const evt: BuildEvent = { type: 'status', status, at };
await pub.publish(channel(buildId), JSON.stringify(evt));
}
export async function emitDone(
buildId: string,
status: BuildStatus,
serverId: string,
publicUrl: string | null,
): Promise<void> {
const at = new Date().toISOString();
const evt: BuildEvent = { type: 'done', status, serverId, publicUrl, at };
await pub.publish(channel(buildId), JSON.stringify(evt));
}
export async function emitError(buildId: string, message: string): Promise<void> {
const at = new Date().toISOString();
const evt: BuildEvent = { type: 'error', message, at };
await pub.publish(channel(buildId), JSON.stringify(evt));
await db.insert(buildLogs).values({ buildId, level: 'error', message });
}

View File

@ -0,0 +1,124 @@
import type { GeneratorSpec, ToolSpec } from '@bmm/types';
function toZod(param: ToolSpec['inputSchema'][string]): string {
const required = param.required !== false;
let base: string;
switch (param.type) {
case 'string':
base = 'z.string()';
break;
case 'number':
base = 'z.number()';
break;
case 'boolean':
base = 'z.boolean()';
break;
case 'array':
base = 'z.array(z.any())';
break;
case 'object':
base = 'z.record(z.string(), z.any())';
break;
}
if (param.description) base += `.describe(${JSON.stringify(param.description)})`;
if (!required) base += '.optional()';
return base;
}
function renderTool(tool: ToolSpec): string {
const entries = Object.entries(tool.inputSchema)
.map(([k, v]) => ` ${JSON.stringify(k)}: ${toZod(v)}`)
.join(',\n');
const schemaShape = entries ? `{\n${entries}\n }` : '{}';
return `server.registerTool(
${JSON.stringify(tool.name)},
{
title: ${JSON.stringify(tool.name)},
description: ${JSON.stringify(tool.description)},
inputSchema: ${schemaShape},
},
async (args) => {
try {
${tool.implementation}
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
return { content: [{ type: 'text', text: 'Error: ' + msg }], isError: true };
}
},
);`;
}
export function renderServerCode(spec: GeneratorSpec): string {
const toolBlocks = spec.tools.map(renderTool).join('\n\n');
return `// AUTO-GENERATED. Do not edit by hand.
// Generated by BuildMyMCPServer.
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';
import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js';
import { z } from 'zod';
import Fastify from 'fastify';
import { createRemoteJWKSet, jwtVerify } from 'jose';
import { randomUUID } from 'node:crypto';
const PUBLIC_URL = process.env.PUBLIC_URL ?? 'http://localhost:3000';
const CONTROL_PLANE_URL = process.env.CONTROL_PLANE_URL ?? 'http://host.docker.internal:4000';
const PORT = Number.parseInt(process.env.PORT ?? '3000', 10);
const server = new McpServer(
{ name: ${JSON.stringify(spec.name)}, version: '1.0.0' },
{ capabilities: { tools: {}, resources: {}, prompts: {} } },
);
${toolBlocks}
const app = Fastify({ logger: { level: 'info' } });
app.get('/health', async () => ({ ok: true }));
app.get('/.well-known/oauth-protected-resource', async () => ({
resource: PUBLIC_URL,
authorization_servers: [CONTROL_PLANE_URL + '/oauth'],
bearer_methods_supported: ['header'],
scopes_supported: ${JSON.stringify(spec.scopes)},
}));
app.get('/.well-known/oauth-authorization-server', async () => {
const r = await fetch(CONTROL_PLANE_URL + '/oauth/.well-known/oauth-authorization-server');
return await r.json();
});
const JWKS = createRemoteJWKSet(new URL(CONTROL_PLANE_URL + '/oauth/jwks'));
const transport = new StreamableHTTPServerTransport({ sessionIdGenerator: () => randomUUID() });
app.all('/mcp', async (request, reply) => {
const auth = request.headers.authorization;
if (!auth || !auth.startsWith('Bearer ')) {
return reply
.code(401)
.header('WWW-Authenticate', \`Bearer resource_metadata="\${PUBLIC_URL}/.well-known/oauth-protected-resource"\`)
.send({ error: 'unauthorized' });
}
const token = auth.slice(7);
try {
const { payload } = await jwtVerify(token, JWKS, {
issuer: CONTROL_PLANE_URL + '/oauth',
audience: PUBLIC_URL,
});
if (payload.aud !== PUBLIC_URL) {
return reply.code(403).send({ error: 'invalid_audience' });
}
} catch (e) {
request.log.warn({ err: e }, 'token verify failed');
return reply.code(401).send({ error: 'invalid_token' });
}
await transport.handleRequest(request.raw, reply.raw, request.body);
});
await server.connect(transport);
await app.listen({ port: PORT, host: '0.0.0.0' });
app.log.info('mcp server up on :' + PORT);
// suppress unused-z warning when there are no tools
void z;
`;
}

View File

@ -0,0 +1,120 @@
import { Worker } from 'bullmq';
import { Redis } from 'ioredis';
import { builds, createDb, eq, mcpServers } from '@bmm/db';
import { config } from './config.js';
import { generateSpec } from './lib/claude.js';
import { renderServerCode } from './lib/render.js';
import { dockerBuild, prepareBuildContext, staticCheck } from './lib/build.js';
import { allocatePort, deployContainer, dockerAvailable } from './lib/deploy.js';
import { emitDone, emitError, emitLog, emitStatus } from './lib/emit.js';
const db = createDb();
const connection = new Redis(config.REDIS_URL, { maxRetriesPerRequest: null });
interface JobData {
buildId: string;
serverId: string;
orgId: string;
prompt: string;
version: number;
slug: string;
serverName: string;
secrets: Record<string, string>;
}
export const worker = new Worker<JobData>(
'build',
async (job) => {
const { buildId, serverId, prompt, version, slug, secrets } = job.data;
const log = (level: 'info' | 'warn' | 'error', msg: string) => emitLog(buildId, level, msg);
try {
await db.update(builds).set({ status: 'generating', startedAt: new Date() }).where(eq(builds.id, buildId));
await db.update(mcpServers).set({ status: 'generating', updatedAt: new Date() }).where(eq(mcpServers.id, serverId));
await emitStatus(buildId, 'generating');
await log('info', 'Generating MCP server spec...');
const { spec, source } = await generateSpec(prompt);
await log('info', `Spec generated via ${source} (${spec.tools.length} tool(s))`);
const generatedCode = renderServerCode(spec);
await db
.update(builds)
.set({ generatedSpec: spec, generatedCode })
.where(eq(builds.id, buildId));
await db.update(builds).set({ status: 'building' }).where(eq(builds.id, buildId));
await db.update(mcpServers).set({ status: 'building', toolsSchema: spec.tools, updatedAt: new Date() }).where(eq(mcpServers.id, serverId));
await emitStatus(buildId, 'building');
await log('info', 'Preparing build context...');
const { contextDir, imageTag } = await prepareBuildContext(serverId, version, slug, generatedCode, spec);
await log('info', `Build context at ${contextDir}`);
await log('info', 'Running static checks...');
await staticCheck(contextDir);
await log('info', 'Static checks passed.');
const hasDocker = await dockerAvailable();
if (!hasDocker) {
await log('warn', 'Docker not available — skipping build/deploy. Server marked draft.');
await db.update(builds).set({ status: 'failed', errorMessage: 'docker_unavailable', finishedAt: new Date() }).where(eq(builds.id, buildId));
await db.update(mcpServers).set({ status: 'failed', updatedAt: new Date() }).where(eq(mcpServers.id, serverId));
await emitDone(buildId, 'failed', serverId, null);
return;
}
await log('info', `Building Docker image ${imageTag}...`);
await dockerBuild(contextDir, imageTag, (line) => {
emitLog(buildId, 'info', line).catch(() => undefined);
});
await log('info', 'Image built.');
await db.update(builds).set({ status: 'deploying' }).where(eq(builds.id, buildId));
await db.update(mcpServers).set({ status: 'deploying', updatedAt: new Date() }).where(eq(mcpServers.id, serverId));
await emitStatus(buildId, 'deploying');
const port = await allocatePort();
const publicUrl = `http://${config.RUNNER_HOST}:${port}`;
const envVars: Record<string, string> = {
...secrets,
PUBLIC_URL: publicUrl,
CONTROL_PLANE_URL: config.CONTROL_PLANE_URL,
PORT: '3000',
SERVER_ID: serverId,
};
const handle = await deployContainer({ serverId, slug, hostPort: port, imageTag, envVars });
await log('info', `Container ${handle.containerId.slice(0, 12)} running at ${handle.publicUrl}`);
await db
.update(builds)
.set({ status: 'success', finishedAt: new Date() })
.where(eq(builds.id, buildId));
await db
.update(mcpServers)
.set({ status: 'live', currentVersion: version, publicUrl: handle.publicUrl, updatedAt: new Date() })
.where(eq(mcpServers.id, serverId));
await emitStatus(buildId, 'success');
await emitDone(buildId, 'success', serverId, handle.publicUrl);
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
console.error('[worker] build failed:', err);
await db
.update(builds)
.set({ status: 'failed', errorMessage: msg, finishedAt: new Date() })
.where(eq(builds.id, buildId));
await db
.update(mcpServers)
.set({ status: 'failed', updatedAt: new Date() })
.where(eq(mcpServers.id, serverId));
await emitError(buildId, msg);
await emitDone(buildId, 'failed', serverId, null);
}
},
{ connection, concurrency: 2 },
);
worker.on('ready', () => console.log('[generator] worker ready'));
worker.on('failed', (job, err) => console.error('[generator] job failed', job?.id, err?.message));
worker.on('error', (err) => console.error('[generator] worker error', err.message));

View File

@ -0,0 +1,9 @@
{
"extends": "../../tsconfig.base.json",
"compilerOptions": {
"outDir": "./dist",
"rootDir": "./src",
"types": ["node"]
},
"include": ["src/**/*"]
}