import { randomUUID } from "node:crypto"; import fs from "node:fs"; import net from "node:net"; import os from "node:os"; import path from "node:path"; import { createServer } from "node:http"; import { and, asc, eq } from "drizzle-orm"; import { WebSocketServer } from "ws"; import { afterAll, beforeAll, describe, expect, it } from "vitest"; import { agents, agentWakeupRequests, applyPendingMigrations, companies, createDb, ensurePostgresDatabase, heartbeatRuns, issueComments, issues, } from "@paperclipai/db"; import { heartbeatService } from "../services/heartbeat.ts"; type EmbeddedPostgresInstance = { initialise(): Promise; start(): Promise; stop(): Promise; }; type EmbeddedPostgresCtor = new (opts: { databaseDir: string; user: string; password: string; port: number; persistent: boolean; initdbFlags?: string[]; onLog?: (message: unknown) => void; onError?: (message: unknown) => void; }) => EmbeddedPostgresInstance; async function getEmbeddedPostgresCtor(): Promise { const mod = await import("embedded-postgres"); return mod.default as EmbeddedPostgresCtor; } async function getAvailablePort(): Promise { return await new Promise((resolve, reject) => { const server = net.createServer(); server.unref(); server.on("error", reject); server.listen(0, "127.0.0.1", () => { const address = server.address(); if (!address || typeof address === "string") { server.close(() => reject(new Error("Failed to allocate test port"))); return; } const { port } = address; server.close((error) => { if (error) reject(error); else resolve(port); }); }); }); } async function startTempDatabase() { const dataDir = fs.mkdtempSync(path.join(os.tmpdir(), "paperclip-heartbeat-comment-wake-")); const port = await getAvailablePort(); const EmbeddedPostgres = await getEmbeddedPostgresCtor(); const instance = new EmbeddedPostgres({ databaseDir: dataDir, user: "paperclip", password: "paperclip", port, persistent: true, initdbFlags: ["--encoding=UTF8", "--locale=C", "--lc-messages=C"], onLog: () => {}, onError: () => {}, }); await instance.initialise(); await instance.start(); const adminConnectionString = `postgres://paperclip:paperclip@127.0.0.1:${port}/postgres`; await ensurePostgresDatabase(adminConnectionString, "paperclip"); const connectionString = `postgres://paperclip:paperclip@127.0.0.1:${port}/paperclip`; await applyPendingMigrations(connectionString); return { connectionString, instance, dataDir }; } async function waitFor(condition: () => boolean | Promise, timeoutMs = 10_000, intervalMs = 50) { const startedAt = Date.now(); while (Date.now() - startedAt < timeoutMs) { if (await condition()) return; await new Promise((resolve) => setTimeout(resolve, intervalMs)); } throw new Error("Timed out waiting for condition"); } async function createControlledGatewayServer() { const server = createServer(); const wss = new WebSocketServer({ server }); const agentPayloads: Array> = []; let firstWaitRelease: (() => void) | null = null; let firstWaitGate = new Promise((resolve) => { firstWaitRelease = resolve; }); let waitCount = 0; wss.on("connection", (socket) => { socket.send( JSON.stringify({ type: "event", event: "connect.challenge", payload: { nonce: "nonce-123" }, }), ); socket.on("message", async (raw) => { const text = Buffer.isBuffer(raw) ? raw.toString("utf8") : String(raw); const frame = JSON.parse(text) as { type: string; id: string; method: string; params?: Record; }; if (frame.type !== "req") return; if (frame.method === "connect") { socket.send( JSON.stringify({ type: "res", id: frame.id, ok: true, payload: { type: "hello-ok", protocol: 3, server: { version: "test", connId: "conn-1" }, features: { methods: ["connect", "agent", "agent.wait"], events: ["agent"] }, snapshot: { version: 1, ts: Date.now() }, policy: { maxPayload: 1_000_000, maxBufferedBytes: 1_000_000, tickIntervalMs: 30_000 }, }, }), ); return; } if (frame.method === "agent") { agentPayloads.push((frame.params ?? {}) as Record); const runId = typeof frame.params?.idempotencyKey === "string" ? frame.params.idempotencyKey : `run-${agentPayloads.length}`; socket.send( JSON.stringify({ type: "res", id: frame.id, ok: true, payload: { runId, status: "accepted", acceptedAt: Date.now(), }, }), ); return; } if (frame.method === "agent.wait") { waitCount += 1; if (waitCount === 1) { await firstWaitGate; } socket.send( JSON.stringify({ type: "res", id: frame.id, ok: true, payload: { runId: frame.params?.runId, status: "ok", startedAt: 1, endedAt: 2, }, }), ); } }); }); await new Promise((resolve) => { server.listen(0, "127.0.0.1", () => resolve()); }); const address = server.address(); if (!address || typeof address === "string") { throw new Error("Failed to resolve test server address"); } return { url: `ws://127.0.0.1:${address.port}`, getAgentPayloads: () => agentPayloads, releaseFirstWait: () => { firstWaitRelease?.(); firstWaitRelease = null; firstWaitGate = Promise.resolve(); }, close: async () => { await new Promise((resolve) => wss.close(() => resolve())); await new Promise((resolve) => server.close(() => resolve())); }, }; } describe("heartbeat comment wake batching", () => { let db!: ReturnType; let instance: EmbeddedPostgresInstance | null = null; let dataDir = ""; beforeAll(async () => { const started = await startTempDatabase(); db = createDb(started.connectionString); instance = started.instance; dataDir = started.dataDir; }, 45_000); afterAll(async () => { await instance?.stop(); if (dataDir) { fs.rmSync(dataDir, { recursive: true, force: true }); } }); it("batches deferred comment wakes and forwards the ordered batch to the next run", async () => { const gateway = await createControlledGatewayServer(); const companyId = randomUUID(); const agentId = randomUUID(); const issueId = randomUUID(); const issuePrefix = `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`; const heartbeat = heartbeatService(db); try { await db.insert(companies).values({ id: companyId, name: "Paperclip", issuePrefix, requireBoardApprovalForNewAgents: false, }); await db.insert(agents).values({ id: agentId, companyId, name: "Gateway Agent", role: "engineer", status: "idle", adapterType: "openclaw_gateway", adapterConfig: { url: gateway.url, headers: { "x-openclaw-token": "gateway-token", }, payloadTemplate: { message: "wake now", }, waitTimeoutMs: 2_000, }, runtimeConfig: {}, permissions: {}, }); await db.insert(issues).values({ id: issueId, companyId, title: "Batch wake comments", status: "todo", priority: "medium", assigneeAgentId: agentId, issueNumber: 1, identifier: `${issuePrefix}-1`, }); const comment1 = await db .insert(issueComments) .values({ companyId, issueId, authorUserId: "user-1", body: "First comment", }) .returning() .then((rows) => rows[0]); const firstRun = await heartbeat.wakeup(agentId, { source: "automation", triggerDetail: "system", reason: "issue_commented", payload: { issueId, commentId: comment1.id }, contextSnapshot: { issueId, taskId: issueId, commentId: comment1.id, wakeReason: "issue_commented", }, requestedByActorType: "user", requestedByActorId: "user-1", }); expect(firstRun).not.toBeNull(); await waitFor(() => gateway.getAgentPayloads().length === 1); await db.insert(issueComments).values({ companyId, issueId, authorAgentId: agentId, createdByRunId: firstRun?.id ?? null, body: "Heartbeat acknowledged", }); const comment2 = await db .insert(issueComments) .values({ companyId, issueId, authorUserId: "user-1", body: "Second comment", }) .returning() .then((rows) => rows[0]); const comment3 = await db .insert(issueComments) .values({ companyId, issueId, authorUserId: "user-1", body: "Third comment", }) .returning() .then((rows) => rows[0]); const secondRun = await heartbeat.wakeup(agentId, { source: "automation", triggerDetail: "system", reason: "issue_commented", payload: { issueId, commentId: comment2.id }, contextSnapshot: { issueId, taskId: issueId, commentId: comment2.id, wakeReason: "issue_commented", }, requestedByActorType: "user", requestedByActorId: "user-1", }); const thirdRun = await heartbeat.wakeup(agentId, { source: "automation", triggerDetail: "system", reason: "issue_commented", payload: { issueId, commentId: comment3.id }, contextSnapshot: { issueId, taskId: issueId, commentId: comment3.id, wakeReason: "issue_commented", }, requestedByActorType: "user", requestedByActorId: "user-1", }); expect(secondRun).toBeNull(); expect(thirdRun).toBeNull(); await waitFor(async () => { const deferred = await db .select() .from(agentWakeupRequests) .where( and( eq(agentWakeupRequests.companyId, companyId), eq(agentWakeupRequests.agentId, agentId), eq(agentWakeupRequests.status, "deferred_issue_execution"), ), ) .then((rows) => rows[0] ?? null); return Boolean(deferred); }); const deferredWake = await db .select() .from(agentWakeupRequests) .where( and( eq(agentWakeupRequests.companyId, companyId), eq(agentWakeupRequests.agentId, agentId), eq(agentWakeupRequests.status, "deferred_issue_execution"), ), ) .then((rows) => rows[0] ?? null); const deferredContext = (deferredWake?.payload as Record | null)?._paperclipWakeContext as | Record | undefined; expect(deferredContext?.wakeCommentIds).toEqual([comment2.id, comment3.id]); gateway.releaseFirstWait(); await waitFor(() => gateway.getAgentPayloads().length === 2); await waitFor(async () => { const runs = await db.select().from(heartbeatRuns).where(eq(heartbeatRuns.agentId, agentId)); return runs.length === 2 && runs.every((run) => run.status === "succeeded"); }, 30_000); const secondPayload = gateway.getAgentPayloads()[1] ?? {}; expect(secondPayload.paperclip).toMatchObject({ wake: { commentIds: [comment2.id, comment3.id], latestCommentId: comment3.id, }, }); expect(String(secondPayload.message ?? "")).toContain("Second comment"); expect(String(secondPayload.message ?? "")).toContain("Third comment"); expect(String(secondPayload.message ?? "")).not.toContain("First comment"); } finally { gateway.releaseFirstWait(); await gateway.close(); } }, 45_000); it("queues exactly one follow-up run when an issue-bound run exits without a comment", async () => { const gateway = await createControlledGatewayServer(); const companyId = randomUUID(); const agentId = randomUUID(); const issueId = randomUUID(); const issuePrefix = `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`; const heartbeat = heartbeatService(db); try { await db.insert(companies).values({ id: companyId, name: "Paperclip", issuePrefix, requireBoardApprovalForNewAgents: false, }); await db.insert(agents).values({ id: agentId, companyId, name: "Gateway Agent", role: "engineer", status: "idle", adapterType: "openclaw_gateway", adapterConfig: { url: gateway.url, headers: { "x-openclaw-token": "gateway-token", }, payloadTemplate: { message: "wake now", }, waitTimeoutMs: 2_000, }, runtimeConfig: {}, permissions: {}, }); await db.insert(issues).values({ id: issueId, companyId, title: "Require a comment", status: "todo", priority: "medium", assigneeAgentId: agentId, issueNumber: 1, identifier: `${issuePrefix}-1`, }); const firstRun = await heartbeat.wakeup(agentId, { source: "assignment", triggerDetail: "system", reason: "issue_assigned", payload: { issueId }, contextSnapshot: { issueId, taskId: issueId, wakeReason: "issue_assigned", }, requestedByActorType: "system", requestedByActorId: null, }); expect(firstRun).not.toBeNull(); await waitFor(() => gateway.getAgentPayloads().length === 1); const firstPayload = gateway.getAgentPayloads()[0] ?? {}; expect(firstPayload.paperclip).toMatchObject({ wake: { reason: "issue_assigned", issue: { id: issueId, identifier: `${issuePrefix}-1`, title: "Require a comment", status: "todo", priority: "medium", }, commentIds: [], }, }); expect(String(firstPayload.message ?? "")).toContain("## Paperclip Wake Payload"); expect(String(firstPayload.message ?? "")).toContain("Do not switch to another issue until you have handled this wake."); expect(String(firstPayload.message ?? "")).toContain(`${issuePrefix}-1 Require a comment`); gateway.releaseFirstWait(); await waitFor(async () => { const runs = await db .select() .from(heartbeatRuns) .where(eq(heartbeatRuns.agentId, agentId)) .orderBy(asc(heartbeatRuns.createdAt)); return ( runs.length === 2 && runs.every((run) => run.status === "succeeded") && runs[0]?.issueCommentStatus === "retry_queued" && runs[1]?.issueCommentStatus === "retry_exhausted" ); }); const runs = await db .select() .from(heartbeatRuns) .where(eq(heartbeatRuns.agentId, agentId)) .orderBy(asc(heartbeatRuns.createdAt)); expect(runs).toHaveLength(2); expect(runs[0]?.issueCommentStatus).toBe("retry_queued"); expect(runs[1]?.retryOfRunId).toBe(runs[0]?.id); expect(runs[1]?.issueCommentStatus).toBe("retry_exhausted"); const comments = await db .select() .from(issueComments) .where(eq(issueComments.issueId, issueId)); expect(comments).toHaveLength(0); await waitFor(async () => { const wakeups = await db .select() .from(agentWakeupRequests) .where(and(eq(agentWakeupRequests.companyId, companyId), eq(agentWakeupRequests.agentId, agentId))); return wakeups.length >= 2; }); const payloads = gateway.getAgentPayloads(); expect(payloads).toHaveLength(2); expect(runs[1]?.contextSnapshot).toMatchObject({ retryReason: "missing_issue_comment", }); } finally { gateway.releaseFirstWait(); await gateway.close(); } }, 20_000); });