import { Worker } from 'bullmq'; import { Redis } from 'ioredis'; import { GeneratorSpec } from '@bmm/types'; import { builds, createDb, eq, mcpServers } from '@bmm/db'; import { config } from './config.js'; import { generateSpec } from './lib/claude.js'; import { renderServerCode } from './lib/render.js'; import { dockerBuild, prepareBuildContext, staticCheck } from './lib/build.js'; import { allocatePort, deployContainer, dockerAvailable } from './lib/deploy.js'; import { emitDone, emitError, emitLog, emitStatus } from './lib/emit.js'; const db = createDb(); const connection = new Redis(config.REDIS_URL, { maxRetriesPerRequest: null }); const cacheReader = new Redis(config.REDIS_URL, { maxRetriesPerRequest: null }); interface JobData { buildId: string; serverId: string; orgId: string; prompt: string; version: number; slug: string; serverName: string; secrets: Record; previewId?: string; } async function loadCachedSpec(previewId: string): Promise { const raw = await cacheReader.get(`preview:${previewId}`); if (!raw) return null; try { const parsed = GeneratorSpec.safeParse(JSON.parse(raw)); return parsed.success ? parsed.data : null; } catch { return null; } } export const worker = new Worker( 'build', async (job) => { const { buildId, serverId, prompt, version, slug, secrets, previewId } = job.data; const log = (level: 'info' | 'warn' | 'error', msg: string) => emitLog(buildId, level, msg); try { await db.update(builds).set({ status: 'generating', startedAt: new Date() }).where(eq(builds.id, buildId)); await db.update(mcpServers).set({ status: 'generating', updatedAt: new Date() }).where(eq(mcpServers.id, serverId)); await emitStatus(buildId, 'generating'); let spec: GeneratorSpec | null = null; let source: 'claude' | 'mock' | 'cached' = 'mock'; if (previewId) { spec = await loadCachedSpec(previewId); if (spec) { source = 'cached'; await log('info', `Re-using preview spec ${previewId} (skipping Claude call)`); } else { await log('warn', `Preview ${previewId} cache miss — regenerating`); } } if (!spec) { await log('info', 'Generating MCP server spec...'); const result = await generateSpec(prompt); spec = result.spec; source = result.source; } await log('info', `Spec ready via ${source} (${spec.tools.length} tool(s))`); const generatedCode = renderServerCode(spec); await db .update(builds) .set({ generatedSpec: spec, generatedCode }) .where(eq(builds.id, buildId)); await db.update(builds).set({ status: 'building' }).where(eq(builds.id, buildId)); await db.update(mcpServers).set({ status: 'building', toolsSchema: spec.tools, updatedAt: new Date() }).where(eq(mcpServers.id, serverId)); await emitStatus(buildId, 'building'); await log('info', 'Preparing build context...'); const { contextDir, imageTag } = await prepareBuildContext(serverId, version, slug, generatedCode, spec); await log('info', `Build context at ${contextDir}`); await log('info', 'Running static checks...'); await staticCheck(contextDir); await log('info', 'Static checks passed.'); const hasDocker = await dockerAvailable(); if (!hasDocker) { await log('warn', 'Docker not available — skipping build/deploy. Server marked draft.'); await db.update(builds).set({ status: 'failed', errorMessage: 'docker_unavailable', finishedAt: new Date() }).where(eq(builds.id, buildId)); await db.update(mcpServers).set({ status: 'failed', updatedAt: new Date() }).where(eq(mcpServers.id, serverId)); await emitDone(buildId, 'failed', serverId, null); return; } await log('info', `Building Docker image ${imageTag}...`); await dockerBuild(contextDir, imageTag, (line) => { emitLog(buildId, 'info', line).catch(() => undefined); }); await log('info', 'Image built.'); await db.update(builds).set({ status: 'deploying' }).where(eq(builds.id, buildId)); await db.update(mcpServers).set({ status: 'deploying', updatedAt: new Date() }).where(eq(mcpServers.id, serverId)); await emitStatus(buildId, 'deploying'); const port = await allocatePort(); const publicUrl = `http://${config.RUNNER_HOST}:${port}`; const envVars: Record = { ...secrets, PUBLIC_URL: publicUrl, CONTROL_PLANE_URL: config.CONTROL_PLANE_URL, OAUTH_ISSUER: `${config.CONTROL_PLANE_PUBLIC_URL}/oauth`, PORT: '3000', SERVER_ID: serverId, }; const handle = await deployContainer({ serverId, slug, hostPort: port, imageTag, envVars }); await log('info', `Container ${handle.containerId.slice(0, 12)} running at ${handle.publicUrl}`); await db .update(builds) .set({ status: 'success', finishedAt: new Date() }) .where(eq(builds.id, buildId)); await db .update(mcpServers) .set({ status: 'live', currentVersion: version, publicUrl: handle.publicUrl, updatedAt: new Date() }) .where(eq(mcpServers.id, serverId)); await emitStatus(buildId, 'success'); await emitDone(buildId, 'success', serverId, handle.publicUrl); } catch (err) { const msg = err instanceof Error ? err.message : String(err); console.error('[worker] build failed:', err); await db .update(builds) .set({ status: 'failed', errorMessage: msg, finishedAt: new Date() }) .where(eq(builds.id, buildId)); await db .update(mcpServers) .set({ status: 'failed', updatedAt: new Date() }) .where(eq(mcpServers.id, serverId)); await emitError(buildId, msg); await emitDone(buildId, 'failed', serverId, null); } }, { connection, concurrency: 2 }, ); worker.on('ready', () => console.log('[generator] worker ready')); worker.on('failed', (job, err) => console.error('[generator] job failed', job?.id, err?.message)); worker.on('error', (err) => console.error('[generator] worker error', err.message));