mirror of
https://github.com/alkimake/paperclip.git
synced 2026-06-15 10:30:37 +09:00
Serialize sandbox callback bridge against concurrent heartbeats (#5326)
> **Stacked PR.** This PR's branch carries cumulative content from #5324 (bridge allowlist expand) and #5325 (env sanitization) — the mutex/sha256 logic in this PR sits on top of both. Reviewers should focus on the files this PR's commit touches: `packages/adapter-utils/src/sandbox-callback-bridge.{ts,test.ts}`, `packages/adapter-utils/src/ssh.ts`, and `packages/adapter-utils/src/ssh-fixture.test.ts`. Will rebase onto `master` and force-push once both prerequisite PRs are merged. ## Thinking Path > - Paperclip orchestrates AI agents for zero-human companies > - Each agent that runs in a sandbox or via SSH talks back to the Paperclip server through a per-lease callback bridge whose entrypoint script is uploaded to the remote > - When two heartbeats target the same agent on the same machine concurrently, both upload the bridge entrypoint and both write to the same response files — producing torn-write races: `SyntaxError: Identifier 'randomUUID' has already been declared` from a concatenated upload, `mv: cannot stat …` from colliding `.json.tmp` writes, and 0-byte commits from a truncated stdin > - This pull request serializes those operations with a POSIX `mkdir`-mutex (PID liveness check + atomic rename) at the bridge entrypoint upload, applies the same lock to the bridge response writer, forwards stdin into remote ssh commands so the entrypoint payload arrives intact, and verifies a sha256 of the upload before promoting it > - The benefit is concurrent heartbeats no longer corrupt each other's bridge state ## What Changed - `packages/adapter-utils/src/sandbox-callback-bridge.ts`: serialize entrypoint upload and response writes via POSIX `mkdir`-mutex with PID liveness; sha256 the upload before promoting via `mv`; content-skip when the existing entrypoint already matches - `packages/adapter-utils/src/ssh.ts`: forward stdin into remote ssh commands through the SSH managed runtime so `cat > "$remote_upload"` actually receives the base64-encoded entrypoint - `packages/adapter-utils/src/ssh-fixture.test.ts`: cover the stdin-forwarded SSH path - `packages/adapter-utils/src/sandbox-callback-bridge.test.ts`: cover the mutex, content-skip, sha256-verify, and atomic-rename paths ## Verification - `pnpm vitest run --no-coverage --project @paperclipai/adapter-utils` - `pnpm typecheck` clean - Manual: two parallel heartbeats targeting the same SSH agent no longer race on the bridge entrypoint or response files ## Risks Medium. Serializing previously-parallel operations adds latency on the contended path (one heartbeat waits on another), bounded by the entrypoint upload time. The mutex includes PID liveness so a crashed heartbeat doesn't deadlock subsequent ones. Sha256-verify gives a clear "torn upload" failure mode instead of silent 0-byte commits. ## Model Used Claude Opus 4.7 (1M context) ## Checklist - [x] I have included a thinking path that traces from project context to this change - [x] I have specified the model used (with version and capability details) - [x] I have checked ROADMAP.md and confirmed this PR does not duplicate planned core work - [x] I have run tests locally and they pass - [x] I have added or updated tests where applicable — tests cover mutex + sha256-verify + stdin-forwarded ssh - [x] If this change affects the UI, I have included before/after screenshots — N/A (no UI) - [x] I have updated relevant documentation to reflect my changes - [x] I have considered and documented any risks above - [x] I will address all Greptile and reviewer comments before requesting merge
This commit is contained in:
parent
f6bad8f6bf
commit
50db8c01d2
4 changed files with 633 additions and 16 deletions
|
|
@ -61,6 +61,7 @@ export function createSshCommandManagedRuntimeRunner(input: {
|
|||
|
||||
try {
|
||||
const result = await runSshCommand(input.spec, remoteCommand, {
|
||||
stdin: commandInput.stdin,
|
||||
timeoutMs: commandInput.timeoutMs,
|
||||
maxBuffer: maxBufferBytes,
|
||||
});
|
||||
|
|
@ -205,6 +206,113 @@ async function execFileText(
|
|||
});
|
||||
}
|
||||
|
||||
async function spawnText(
|
||||
file: string,
|
||||
args: string[],
|
||||
options: {
|
||||
stdin?: string;
|
||||
timeout?: number;
|
||||
maxBuffer?: number;
|
||||
} = {},
|
||||
): Promise<SshCommandResult> {
|
||||
return await new Promise<SshCommandResult>((resolve, reject) => {
|
||||
const child = spawn(file, args, {
|
||||
stdio: [options.stdin != null ? "pipe" : "ignore", "pipe", "pipe"],
|
||||
});
|
||||
|
||||
const maxBuffer = options.maxBuffer ?? 1024 * 128;
|
||||
let stdout = "";
|
||||
let stderr = "";
|
||||
let settled = false;
|
||||
let timedOut = false;
|
||||
|
||||
const finishReject = (error: Error & { stdout?: string; stderr?: string; code?: number | null; killed?: boolean }) => {
|
||||
if (settled) return;
|
||||
settled = true;
|
||||
error.stdout = stdout;
|
||||
error.stderr = stderr;
|
||||
error.killed = timedOut;
|
||||
reject(error);
|
||||
};
|
||||
|
||||
const append = (
|
||||
streamName: "stdout" | "stderr",
|
||||
chunk: unknown,
|
||||
) => {
|
||||
const text = String(chunk);
|
||||
if (streamName === "stdout") {
|
||||
stdout += text;
|
||||
} else {
|
||||
stderr += text;
|
||||
}
|
||||
if (Buffer.byteLength(stdout, "utf8") > maxBuffer || Buffer.byteLength(stderr, "utf8") > maxBuffer) {
|
||||
child.kill("SIGTERM");
|
||||
finishReject(Object.assign(new Error(`Process output exceeded maxBuffer of ${maxBuffer} bytes.`), {
|
||||
code: null,
|
||||
}));
|
||||
}
|
||||
};
|
||||
|
||||
let killEscalation: NodeJS.Timeout | null = null;
|
||||
const timeout = options.timeout && options.timeout > 0
|
||||
? setTimeout(() => {
|
||||
timedOut = true;
|
||||
child.kill("SIGTERM");
|
||||
// Escalate to SIGKILL after a 5s grace window so a hung remote
|
||||
// command that ignores SIGTERM cannot keep the child alive
|
||||
// indefinitely.
|
||||
killEscalation = setTimeout(() => {
|
||||
try {
|
||||
child.kill("SIGKILL");
|
||||
} catch {
|
||||
// child may have already exited between the SIGTERM and the
|
||||
// escalation — that's fine.
|
||||
}
|
||||
}, 5_000);
|
||||
killEscalation.unref?.();
|
||||
}, options.timeout)
|
||||
: null;
|
||||
|
||||
const clearTimers = () => {
|
||||
if (timeout) clearTimeout(timeout);
|
||||
if (killEscalation) clearTimeout(killEscalation);
|
||||
};
|
||||
|
||||
child.stdout?.on("data", (chunk) => {
|
||||
append("stdout", chunk);
|
||||
});
|
||||
child.stderr?.on("data", (chunk) => {
|
||||
append("stderr", chunk);
|
||||
});
|
||||
|
||||
child.on("error", (error) => {
|
||||
clearTimers();
|
||||
finishReject(Object.assign(error, { code: null }));
|
||||
});
|
||||
|
||||
child.on("close", (code, signal) => {
|
||||
clearTimers();
|
||||
if (settled) return;
|
||||
settled = true;
|
||||
if (code === 0) {
|
||||
resolve({ stdout, stderr });
|
||||
return;
|
||||
}
|
||||
reject(Object.assign(new Error(stderr.trim() || stdout.trim() || `Process exited with code ${code ?? -1}`), {
|
||||
stdout,
|
||||
stderr,
|
||||
code,
|
||||
signal,
|
||||
killed: timedOut,
|
||||
}));
|
||||
});
|
||||
|
||||
if (options.stdin != null && child.stdin) {
|
||||
child.stdin.end(options.stdin);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async function runLocalGit(
|
||||
localDir: string,
|
||||
args: string[],
|
||||
|
|
@ -722,6 +830,7 @@ export async function runSshCommand(
|
|||
remoteCommand: string,
|
||||
options: {
|
||||
env?: Record<string, string>;
|
||||
stdin?: string;
|
||||
timeoutMs?: number;
|
||||
maxBuffer?: number;
|
||||
} = {},
|
||||
|
|
@ -760,10 +869,16 @@ export async function runSshCommand(
|
|||
`sh -lc ${shellQuote(remoteScript)}`,
|
||||
);
|
||||
|
||||
return await execFileText("ssh", sshArgs, {
|
||||
timeout: options.timeoutMs ?? 15_000,
|
||||
maxBuffer: options.maxBuffer ?? 1024 * 128,
|
||||
});
|
||||
return options.stdin != null
|
||||
? await spawnText("ssh", sshArgs, {
|
||||
stdin: options.stdin,
|
||||
timeout: options.timeoutMs ?? 15_000,
|
||||
maxBuffer: options.maxBuffer ?? 1024 * 128,
|
||||
})
|
||||
: await execFileText("ssh", sshArgs, {
|
||||
timeout: options.timeoutMs ?? 15_000,
|
||||
maxBuffer: options.maxBuffer ?? 1024 * 128,
|
||||
});
|
||||
} finally {
|
||||
await cleanup();
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue