Limit isolated workspace memory spikes

Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
dotta 2026-04-06 08:35:59 -05:00
parent 37d2d5ef02
commit 0a9a8b5a44
4 changed files with 238 additions and 16 deletions

View file

@ -176,4 +176,49 @@ describeEmbeddedPostgres("runDatabaseBackup", () => {
},
60_000,
);
it(
"restores statements incrementally when backup comments precede the first breakpoint",
async () => {
const restoreConnectionString = await createTempDatabase();
const restoreSql = postgres(restoreConnectionString, { max: 1, onnotice: () => {} });
const backupDir = createTempDir("paperclip-db-restore-manual-");
const backupFile = path.join(backupDir, "manual.sql");
try {
await fs.promises.writeFile(
backupFile,
[
"-- Paperclip database backup",
"-- Created: 2026-04-06T00:00:00.000Z",
"",
"BEGIN;",
"-- paperclip statement breakpoint 69f6f3f1-42fd-46a6-bf17-d1d85f8f3900",
"CREATE TABLE public.restore_stream_test (id integer primary key, payload text not null);",
"-- paperclip statement breakpoint 69f6f3f1-42fd-46a6-bf17-d1d85f8f3900",
"INSERT INTO public.restore_stream_test (id, payload)",
"VALUES (1, 'hello');",
"-- paperclip statement breakpoint 69f6f3f1-42fd-46a6-bf17-d1d85f8f3900",
"COMMIT;",
"-- paperclip statement breakpoint 69f6f3f1-42fd-46a6-bf17-d1d85f8f3900",
].join("\n"),
"utf8",
);
await runDatabaseRestore({
connectionString: restoreConnectionString,
backupFile,
});
const rows = await restoreSql.unsafe<{ payload: string }[]>(`
SELECT payload
FROM public.restore_stream_test
`);
expect(rows).toEqual([{ payload: "hello" }]);
} finally {
await restoreSql.end();
}
},
20_000,
);
});

View file

@ -1,6 +1,6 @@
import { createWriteStream, existsSync, mkdirSync, readdirSync, statSync, unlinkSync } from "node:fs";
import { readFile } from "node:fs/promises";
import { createReadStream, createWriteStream, existsSync, mkdirSync, readdirSync, statSync, unlinkSync } from "node:fs";
import { basename, resolve } from "node:path";
import { createInterface } from "node:readline";
import postgres from "postgres";
export type RunDatabaseBackupOptions = {
@ -142,6 +142,42 @@ function tableKey(schemaName: string, tableName: string): string {
return `${schemaName}.${tableName}`;
}
async function* readRestoreStatements(backupFile: string): AsyncGenerator<string> {
const stream = createReadStream(backupFile, { encoding: "utf8" });
const reader = createInterface({
input: stream,
crlfDelay: Infinity,
});
let statementLines: string[] = [];
const flushStatement = () => {
const statement = statementLines.join("\n").trim();
statementLines = [];
return statement;
};
try {
for await (const line of reader) {
if (line === STATEMENT_BREAKPOINT) {
const statement = flushStatement();
if (statement.length > 0) {
yield statement;
}
continue;
}
statementLines.push(line);
}
const trailingStatement = flushStatement();
if (trailingStatement.length > 0) {
yield trailingStatement;
}
} finally {
reader.close();
stream.destroy();
}
}
export function createBufferedTextFileWriter(filePath: string, maxBufferedBytes = DEFAULT_BACKUP_WRITE_BUFFER_BYTES) {
const stream = createWriteStream(filePath, { encoding: "utf8" });
const flushThreshold = Math.max(1, Math.trunc(maxBufferedBytes));
@ -626,13 +662,7 @@ export async function runDatabaseRestore(opts: RunDatabaseRestoreOptions): Promi
try {
await sql`SELECT 1`;
const contents = await readFile(opts.backupFile, "utf8");
const statements = contents
.split(STATEMENT_BREAKPOINT)
.map((statement) => statement.trim())
.filter((statement) => statement.length > 0);
for (const statement of statements) {
for await (const statement of readRestoreStatements(opts.backupFile)) {
await sql.unsafe(statement).execute();
}
} catch (error) {

View file

@ -957,6 +957,57 @@ describe("realizeExecutionWorkspace", () => {
expect(operations[1]?.command).toBe("bash ./scripts/provision.sh");
});
it("truncates oversized provision command output before storing it in memory", async () => {
const repoRoot = await createTempRepo();
const { recorder, operations } = createWorkspaceOperationRecorderDouble();
await fs.mkdir(path.join(repoRoot, "scripts"), { recursive: true });
await fs.writeFile(
path.join(repoRoot, "scripts", "noisy.js"),
'process.stdout.write("x".repeat(400000));\n',
"utf8",
);
await runGit(repoRoot, ["add", "scripts/noisy.js"]);
await runGit(repoRoot, ["commit", "-m", "Add noisy provision script"]);
await realizeExecutionWorkspace({
base: {
baseCwd: repoRoot,
source: "project_primary",
projectId: "project-1",
workspaceId: "workspace-1",
repoUrl: null,
repoRef: "HEAD",
},
config: {
workspaceStrategy: {
type: "git_worktree",
branchTemplate: "{{issue.identifier}}-{{slug}}",
provisionCommand: "node ./scripts/noisy.js",
},
},
issue: {
id: "issue-1",
identifier: "PAP-1142",
title: "Limit noisy provision output",
},
agent: {
id: "agent-1",
name: "Codex Coder",
companyId: "company-1",
},
recorder,
});
const provisionOperation = operations.find((operation) => operation.phase === "workspace_provision");
expect(provisionOperation?.result.metadata).toMatchObject({
stdoutTruncated: true,
stderrTruncated: false,
});
expect(provisionOperation?.result.stdout).toContain("[output truncated to last");
expect(provisionOperation?.result.stdout?.length ?? 0).toBeLessThan(300000);
});
it("reuses an existing branch without resetting it when recreating a missing worktree", async () => {
const repoRoot = await createTempRepo();
const branchName = "PAP-450-recreate-missing-worktree";

View file

@ -102,6 +102,18 @@ interface RuntimeServiceRecord extends RuntimeServiceRef {
const runtimeServicesById = new Map<string, RuntimeServiceRecord>();
const runtimeServicesByReuseKey = new Map<string, string>();
const runtimeServiceLeasesByRun = new Map<string, string[]>();
const DEFAULT_EXECUTE_PROCESS_OUTPUT_BYTES = 256 * 1024;
type ProcessOutputCapture = {
text: string;
truncated: boolean;
totalBytes: number;
};
type ProcessOutputAccumulator = {
append(chunk: string): void;
finish(): ProcessOutputCapture;
};
export async function resetRuntimeServicesForTests() {
for (const record of runtimeServicesById.values()) {
@ -381,30 +393,96 @@ function formatCommandForDisplay(command: string, args: string[]) {
.join(" ");
}
function createProcessOutputCapture(maxBytes: number): ProcessOutputAccumulator {
const limit = Math.max(1, Math.trunc(maxBytes));
let chunks: string[] = [];
let truncated = false;
let totalBytes = 0;
return {
append(chunk: string) {
if (!chunk) return;
chunks.push(chunk);
totalBytes += Buffer.byteLength(chunk, "utf8");
let currentBytes = chunks.reduce((sum, value) => sum + Buffer.byteLength(value, "utf8"), 0);
if (currentBytes <= limit) return;
const combined = Buffer.from(chunks.join(""), "utf8");
const tail = combined.subarray(Math.max(0, combined.length - limit)).toString("utf8");
chunks = [tail];
truncated = true;
currentBytes = Buffer.byteLength(tail, "utf8");
if (currentBytes > limit) {
chunks = [Buffer.from(tail, "utf8").subarray(Math.max(0, currentBytes - limit)).toString("utf8")];
}
},
finish(): ProcessOutputCapture {
const text = chunks.join("");
if (!truncated) {
return {
text,
truncated: false,
totalBytes,
};
}
return {
text: `[output truncated to last ${limit} bytes; total ${totalBytes} bytes]\n${text}`,
truncated: true,
totalBytes,
};
},
};
}
async function executeProcess(input: {
command: string;
args: string[];
cwd: string;
env?: NodeJS.ProcessEnv;
}): Promise<{ stdout: string; stderr: string; code: number | null }> {
const proc = await new Promise<{ stdout: string; stderr: string; code: number | null }>((resolve, reject) => {
maxStdoutBytes?: number;
maxStderrBytes?: number;
}): Promise<{
stdout: string;
stderr: string;
code: number | null;
stdoutTruncated: boolean;
stderrTruncated: boolean;
stdoutBytes: number;
stderrBytes: number;
}> {
const proc = await new Promise<{
stdout: ProcessOutputAccumulator;
stderr: ProcessOutputAccumulator;
code: number | null;
}>((resolve, reject) => {
const child = spawn(input.command, input.args, {
cwd: input.cwd,
stdio: ["ignore", "pipe", "pipe"],
env: input.env ?? process.env,
});
let stdout = "";
let stderr = "";
const stdout = createProcessOutputCapture(input.maxStdoutBytes ?? DEFAULT_EXECUTE_PROCESS_OUTPUT_BYTES);
const stderr = createProcessOutputCapture(input.maxStderrBytes ?? DEFAULT_EXECUTE_PROCESS_OUTPUT_BYTES);
child.stdout?.on("data", (chunk) => {
stdout += String(chunk);
stdout.append(String(chunk));
});
child.stderr?.on("data", (chunk) => {
stderr += String(chunk);
stderr.append(String(chunk));
});
child.on("error", reject);
child.on("close", (code) => resolve({ stdout, stderr, code }));
});
return proc;
const stdout = proc.stdout.finish();
const stderr = proc.stderr.finish();
return {
stdout: stdout.text,
stderr: stderr.text,
code: proc.code,
stdoutTruncated: stdout.truncated,
stderrTruncated: stderr.truncated,
stdoutBytes: stdout.totalBytes,
stderrBytes: stderr.totalBytes,
};
}
async function runGit(args: string[], cwd: string): Promise<string> {
@ -588,6 +666,15 @@ async function recordGitOperation(
stdout: result.stdout,
stderr: result.stderr,
system: result.code === 0 ? input.successMessage ?? null : null,
metadata:
result.stdoutTruncated || result.stderrTruncated
? {
stdoutTruncated: result.stdoutTruncated,
stderrTruncated: result.stderrTruncated,
stdoutBytes: result.stdoutBytes,
stderrBytes: result.stderrBytes,
}
: null,
};
},
});
@ -646,6 +733,15 @@ async function recordWorkspaceCommandOperation(
stdout: result.stdout,
stderr: result.stderr,
system: result.code === 0 ? input.successMessage ?? null : null,
metadata:
result.stdoutTruncated || result.stderrTruncated
? {
stdoutTruncated: result.stdoutTruncated,
stderrTruncated: result.stderrTruncated,
stdoutBytes: result.stdoutBytes,
stderrBytes: result.stderrBytes,
}
: null,
};
},
});