From 0a9a8b5a44237a585c9ff04ac30c49460f338a7a Mon Sep 17 00:00:00 2001 From: dotta Date: Mon, 6 Apr 2026 08:35:59 -0500 Subject: [PATCH] Limit isolated workspace memory spikes Co-Authored-By: Paperclip --- packages/db/src/backup-lib.test.ts | 45 +++++++ packages/db/src/backup-lib.ts | 48 ++++++-- .../src/__tests__/workspace-runtime.test.ts | 51 ++++++++ server/src/services/workspace-runtime.ts | 110 ++++++++++++++++-- 4 files changed, 238 insertions(+), 16 deletions(-) diff --git a/packages/db/src/backup-lib.test.ts b/packages/db/src/backup-lib.test.ts index 4c59216b..dcdc87c5 100644 --- a/packages/db/src/backup-lib.test.ts +++ b/packages/db/src/backup-lib.test.ts @@ -176,4 +176,49 @@ describeEmbeddedPostgres("runDatabaseBackup", () => { }, 60_000, ); + + it( + "restores statements incrementally when backup comments precede the first breakpoint", + async () => { + const restoreConnectionString = await createTempDatabase(); + const restoreSql = postgres(restoreConnectionString, { max: 1, onnotice: () => {} }); + const backupDir = createTempDir("paperclip-db-restore-manual-"); + const backupFile = path.join(backupDir, "manual.sql"); + + try { + await fs.promises.writeFile( + backupFile, + [ + "-- Paperclip database backup", + "-- Created: 2026-04-06T00:00:00.000Z", + "", + "BEGIN;", + "-- paperclip statement breakpoint 69f6f3f1-42fd-46a6-bf17-d1d85f8f3900", + "CREATE TABLE public.restore_stream_test (id integer primary key, payload text not null);", + "-- paperclip statement breakpoint 69f6f3f1-42fd-46a6-bf17-d1d85f8f3900", + "INSERT INTO public.restore_stream_test (id, payload)", + "VALUES (1, 'hello');", + "-- paperclip statement breakpoint 69f6f3f1-42fd-46a6-bf17-d1d85f8f3900", + "COMMIT;", + "-- paperclip statement breakpoint 69f6f3f1-42fd-46a6-bf17-d1d85f8f3900", + ].join("\n"), + "utf8", + ); + + await runDatabaseRestore({ + connectionString: restoreConnectionString, + backupFile, + }); + + const rows = await restoreSql.unsafe<{ payload: string }[]>(` + SELECT payload + FROM public.restore_stream_test + `); + expect(rows).toEqual([{ payload: "hello" }]); + } finally { + await restoreSql.end(); + } + }, + 20_000, + ); }); diff --git a/packages/db/src/backup-lib.ts b/packages/db/src/backup-lib.ts index c148b8ba..50ea2cfb 100644 --- a/packages/db/src/backup-lib.ts +++ b/packages/db/src/backup-lib.ts @@ -1,6 +1,6 @@ -import { createWriteStream, existsSync, mkdirSync, readdirSync, statSync, unlinkSync } from "node:fs"; -import { readFile } from "node:fs/promises"; +import { createReadStream, createWriteStream, existsSync, mkdirSync, readdirSync, statSync, unlinkSync } from "node:fs"; import { basename, resolve } from "node:path"; +import { createInterface } from "node:readline"; import postgres from "postgres"; export type RunDatabaseBackupOptions = { @@ -142,6 +142,42 @@ function tableKey(schemaName: string, tableName: string): string { return `${schemaName}.${tableName}`; } +async function* readRestoreStatements(backupFile: string): AsyncGenerator { + const stream = createReadStream(backupFile, { encoding: "utf8" }); + const reader = createInterface({ + input: stream, + crlfDelay: Infinity, + }); + let statementLines: string[] = []; + + const flushStatement = () => { + const statement = statementLines.join("\n").trim(); + statementLines = []; + return statement; + }; + + try { + for await (const line of reader) { + if (line === STATEMENT_BREAKPOINT) { + const statement = flushStatement(); + if (statement.length > 0) { + yield statement; + } + continue; + } + statementLines.push(line); + } + + const trailingStatement = flushStatement(); + if (trailingStatement.length > 0) { + yield trailingStatement; + } + } finally { + reader.close(); + stream.destroy(); + } +} + export function createBufferedTextFileWriter(filePath: string, maxBufferedBytes = DEFAULT_BACKUP_WRITE_BUFFER_BYTES) { const stream = createWriteStream(filePath, { encoding: "utf8" }); const flushThreshold = Math.max(1, Math.trunc(maxBufferedBytes)); @@ -626,13 +662,7 @@ export async function runDatabaseRestore(opts: RunDatabaseRestoreOptions): Promi try { await sql`SELECT 1`; - const contents = await readFile(opts.backupFile, "utf8"); - const statements = contents - .split(STATEMENT_BREAKPOINT) - .map((statement) => statement.trim()) - .filter((statement) => statement.length > 0); - - for (const statement of statements) { + for await (const statement of readRestoreStatements(opts.backupFile)) { await sql.unsafe(statement).execute(); } } catch (error) { diff --git a/server/src/__tests__/workspace-runtime.test.ts b/server/src/__tests__/workspace-runtime.test.ts index 3e472046..b5e1dc31 100644 --- a/server/src/__tests__/workspace-runtime.test.ts +++ b/server/src/__tests__/workspace-runtime.test.ts @@ -957,6 +957,57 @@ describe("realizeExecutionWorkspace", () => { expect(operations[1]?.command).toBe("bash ./scripts/provision.sh"); }); + it("truncates oversized provision command output before storing it in memory", async () => { + const repoRoot = await createTempRepo(); + const { recorder, operations } = createWorkspaceOperationRecorderDouble(); + + await fs.mkdir(path.join(repoRoot, "scripts"), { recursive: true }); + await fs.writeFile( + path.join(repoRoot, "scripts", "noisy.js"), + 'process.stdout.write("x".repeat(400000));\n', + "utf8", + ); + await runGit(repoRoot, ["add", "scripts/noisy.js"]); + await runGit(repoRoot, ["commit", "-m", "Add noisy provision script"]); + + await realizeExecutionWorkspace({ + base: { + baseCwd: repoRoot, + source: "project_primary", + projectId: "project-1", + workspaceId: "workspace-1", + repoUrl: null, + repoRef: "HEAD", + }, + config: { + workspaceStrategy: { + type: "git_worktree", + branchTemplate: "{{issue.identifier}}-{{slug}}", + provisionCommand: "node ./scripts/noisy.js", + }, + }, + issue: { + id: "issue-1", + identifier: "PAP-1142", + title: "Limit noisy provision output", + }, + agent: { + id: "agent-1", + name: "Codex Coder", + companyId: "company-1", + }, + recorder, + }); + + const provisionOperation = operations.find((operation) => operation.phase === "workspace_provision"); + expect(provisionOperation?.result.metadata).toMatchObject({ + stdoutTruncated: true, + stderrTruncated: false, + }); + expect(provisionOperation?.result.stdout).toContain("[output truncated to last"); + expect(provisionOperation?.result.stdout?.length ?? 0).toBeLessThan(300000); + }); + it("reuses an existing branch without resetting it when recreating a missing worktree", async () => { const repoRoot = await createTempRepo(); const branchName = "PAP-450-recreate-missing-worktree"; diff --git a/server/src/services/workspace-runtime.ts b/server/src/services/workspace-runtime.ts index 176dde10..fc75d0d5 100644 --- a/server/src/services/workspace-runtime.ts +++ b/server/src/services/workspace-runtime.ts @@ -102,6 +102,18 @@ interface RuntimeServiceRecord extends RuntimeServiceRef { const runtimeServicesById = new Map(); const runtimeServicesByReuseKey = new Map(); const runtimeServiceLeasesByRun = new Map(); +const DEFAULT_EXECUTE_PROCESS_OUTPUT_BYTES = 256 * 1024; + +type ProcessOutputCapture = { + text: string; + truncated: boolean; + totalBytes: number; +}; + +type ProcessOutputAccumulator = { + append(chunk: string): void; + finish(): ProcessOutputCapture; +}; export async function resetRuntimeServicesForTests() { for (const record of runtimeServicesById.values()) { @@ -381,30 +393,96 @@ function formatCommandForDisplay(command: string, args: string[]) { .join(" "); } +function createProcessOutputCapture(maxBytes: number): ProcessOutputAccumulator { + const limit = Math.max(1, Math.trunc(maxBytes)); + let chunks: string[] = []; + let truncated = false; + let totalBytes = 0; + + return { + append(chunk: string) { + if (!chunk) return; + chunks.push(chunk); + totalBytes += Buffer.byteLength(chunk, "utf8"); + + let currentBytes = chunks.reduce((sum, value) => sum + Buffer.byteLength(value, "utf8"), 0); + if (currentBytes <= limit) return; + + const combined = Buffer.from(chunks.join(""), "utf8"); + const tail = combined.subarray(Math.max(0, combined.length - limit)).toString("utf8"); + chunks = [tail]; + truncated = true; + currentBytes = Buffer.byteLength(tail, "utf8"); + if (currentBytes > limit) { + chunks = [Buffer.from(tail, "utf8").subarray(Math.max(0, currentBytes - limit)).toString("utf8")]; + } + }, + finish(): ProcessOutputCapture { + const text = chunks.join(""); + if (!truncated) { + return { + text, + truncated: false, + totalBytes, + }; + } + return { + text: `[output truncated to last ${limit} bytes; total ${totalBytes} bytes]\n${text}`, + truncated: true, + totalBytes, + }; + }, + }; +} + async function executeProcess(input: { command: string; args: string[]; cwd: string; env?: NodeJS.ProcessEnv; -}): Promise<{ stdout: string; stderr: string; code: number | null }> { - const proc = await new Promise<{ stdout: string; stderr: string; code: number | null }>((resolve, reject) => { + maxStdoutBytes?: number; + maxStderrBytes?: number; +}): Promise<{ + stdout: string; + stderr: string; + code: number | null; + stdoutTruncated: boolean; + stderrTruncated: boolean; + stdoutBytes: number; + stderrBytes: number; +}> { + const proc = await new Promise<{ + stdout: ProcessOutputAccumulator; + stderr: ProcessOutputAccumulator; + code: number | null; + }>((resolve, reject) => { const child = spawn(input.command, input.args, { cwd: input.cwd, stdio: ["ignore", "pipe", "pipe"], env: input.env ?? process.env, }); - let stdout = ""; - let stderr = ""; + const stdout = createProcessOutputCapture(input.maxStdoutBytes ?? DEFAULT_EXECUTE_PROCESS_OUTPUT_BYTES); + const stderr = createProcessOutputCapture(input.maxStderrBytes ?? DEFAULT_EXECUTE_PROCESS_OUTPUT_BYTES); child.stdout?.on("data", (chunk) => { - stdout += String(chunk); + stdout.append(String(chunk)); }); child.stderr?.on("data", (chunk) => { - stderr += String(chunk); + stderr.append(String(chunk)); }); child.on("error", reject); child.on("close", (code) => resolve({ stdout, stderr, code })); }); - return proc; + const stdout = proc.stdout.finish(); + const stderr = proc.stderr.finish(); + return { + stdout: stdout.text, + stderr: stderr.text, + code: proc.code, + stdoutTruncated: stdout.truncated, + stderrTruncated: stderr.truncated, + stdoutBytes: stdout.totalBytes, + stderrBytes: stderr.totalBytes, + }; } async function runGit(args: string[], cwd: string): Promise { @@ -588,6 +666,15 @@ async function recordGitOperation( stdout: result.stdout, stderr: result.stderr, system: result.code === 0 ? input.successMessage ?? null : null, + metadata: + result.stdoutTruncated || result.stderrTruncated + ? { + stdoutTruncated: result.stdoutTruncated, + stderrTruncated: result.stderrTruncated, + stdoutBytes: result.stdoutBytes, + stderrBytes: result.stderrBytes, + } + : null, }; }, }); @@ -646,6 +733,15 @@ async function recordWorkspaceCommandOperation( stdout: result.stdout, stderr: result.stderr, system: result.code === 0 ? input.successMessage ?? null : null, + metadata: + result.stdoutTruncated || result.stderrTruncated + ? { + stdoutTruncated: result.stdoutTruncated, + stderrTruncated: result.stderrTruncated, + stdoutBytes: result.stdoutBytes, + stderrBytes: result.stderrBytes, + } + : null, }; }, });