import { randomUUID } from "node:crypto"; import { eq, sql } from "drizzle-orm"; import { afterAll, afterEach, beforeAll, describe, expect, it, vi } from "vitest"; import { activityLog, agents, agentRuntimeState, agentWakeupRequests, companies, companySkills, createDb, documentRevisions, documents, heartbeatRunEvents, heartbeatRuns, issueComments, issueDocuments, issueRelations, issueTreeHolds, issues, } from "@paperclipai/db"; import { ISSUE_CONTINUATION_SUMMARY_DOCUMENT_KEY } from "@paperclipai/shared"; import { getEmbeddedPostgresTestSupport, startEmbeddedPostgresTestDatabase, } from "./helpers/embedded-postgres.js"; import { MAX_TURN_CONTINUATION_RETRY_REASON, MAX_TURN_CONTINUATION_WAKE_REASON, heartbeatService, } from "../services/heartbeat.ts"; import { runningProcesses } from "../adapters/index.ts"; const mockAdapterExecute = vi.hoisted(() => vi.fn(async () => ({ exitCode: 0, signal: null, timedOut: false, errorMessage: null, summary: "Stale-queue invalidation test run.", provider: "test", model: "test-model", })), ); vi.mock("../adapters/index.ts", async () => { const actual = await vi.importActual("../adapters/index.ts"); return { ...actual, getServerAdapter: vi.fn(() => ({ supportsLocalAgentJwt: false, execute: mockAdapterExecute, })), }; }); const embeddedPostgresSupport = await getEmbeddedPostgresTestSupport(); const describeEmbeddedPostgres = embeddedPostgresSupport.supported ? describe : describe.skip; if (!embeddedPostgresSupport.supported) { console.warn( `Skipping embedded Postgres heartbeat stale-queue invalidation tests on this host: ${embeddedPostgresSupport.reason ?? "unsupported environment"}`, ); } async function ensureIssueRelationsTable(db: ReturnType) { await db.execute(sql.raw(` CREATE TABLE IF NOT EXISTS "issue_relations" ( "id" uuid PRIMARY KEY DEFAULT gen_random_uuid(), "company_id" uuid NOT NULL, "issue_id" uuid NOT NULL, "related_issue_id" uuid NOT NULL, "type" text NOT NULL, "created_by_agent_id" uuid, "created_by_user_id" text, "created_at" timestamptz NOT NULL DEFAULT now(), "updated_at" timestamptz NOT NULL DEFAULT now() ); `)); } async function waitForCondition(fn: () => Promise, timeoutMs = 3_000) { const deadline = Date.now() + timeoutMs; while (Date.now() < deadline) { if (await fn()) return true; await new Promise((resolve) => setTimeout(resolve, 50)); } return fn(); } async function cleanupHeartbeatInvalidationFixture(db: ReturnType) { for (let attempt = 0; attempt < 5; attempt += 1) { try { await db.delete(companySkills); await db.delete(issueComments); await db.delete(issueDocuments); await db.delete(documentRevisions); await db.delete(documents); await db.delete(issueRelations); await db.delete(issueTreeHolds); await db.delete(issues); await db.delete(heartbeatRunEvents); await db.delete(activityLog); await db.delete(heartbeatRuns); await db.delete(agentWakeupRequests); await db.delete(agentRuntimeState); await db.delete(agents); await db.delete(companies); return; } catch (error) { const isLateCommentRace = error instanceof Error && error.message.includes("issue_comments_issue_id_issues_id_fk"); if (!isLateCommentRace || attempt === 4) { throw error; } // Heartbeat completion can write issue-thread comments shortly after the // run leaves queued/running. Retry the dependent deletes once those land. await new Promise((resolve) => setTimeout(resolve, 50)); } } } type SeedOptions = { agentName?: string; agentRole?: string; maxConcurrentRuns?: number; }; type SeedResult = { companyId: string; agentId: string; }; describeEmbeddedPostgres("heartbeat stale queued-run invalidation", () => { let db!: ReturnType; let heartbeat!: ReturnType; let tempDb: Awaited> | null = null; const countExecuteCallsForRun = (runId: string) => mockAdapterExecute.mock.calls.filter(([context]) => context?.runId === runId).length; beforeAll(async () => { tempDb = await startEmbeddedPostgresTestDatabase("paperclip-heartbeat-stale-queue-"); db = createDb(tempDb.connectionString); heartbeat = heartbeatService(db); await ensureIssueRelationsTable(db); }, 20_000); afterEach(async () => { mockAdapterExecute.mockReset(); mockAdapterExecute.mockImplementation(async () => ({ exitCode: 0, signal: null, timedOut: false, errorMessage: null, summary: "Stale-queue invalidation test run.", provider: "test", model: "test-model", })); runningProcesses.clear(); let idlePolls = 0; for (let attempt = 0; attempt < 100; attempt += 1) { const runs = await db .select({ status: heartbeatRuns.status }) .from(heartbeatRuns); const hasActiveRun = runs.some((run) => run.status === "queued" || run.status === "running"); if (!hasActiveRun) { idlePolls += 1; if (idlePolls >= 3) break; } else { idlePolls = 0; } await new Promise((resolve) => setTimeout(resolve, 50)); } await new Promise((resolve) => setTimeout(resolve, 50)); await cleanupHeartbeatInvalidationFixture(db); }); afterAll(async () => { await tempDb?.cleanup(); }); async function seedCompanyAndAgent(opts: SeedOptions = {}): Promise { const companyId = randomUUID(); const agentId = randomUUID(); await db.insert(companies).values({ id: companyId, name: "Paperclip", issuePrefix: `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`, requireBoardApprovalForNewAgents: false, }); await db.insert(agents).values({ id: agentId, companyId, name: opts.agentName ?? "ClaudeCoder", role: opts.agentRole ?? "engineer", status: "active", adapterType: "codex_local", adapterConfig: {}, runtimeConfig: { heartbeat: { wakeOnDemand: true, maxConcurrentRuns: opts.maxConcurrentRuns ?? 1, }, }, permissions: {}, }); return { companyId, agentId }; } async function seedQueuedRun(input: { companyId: string; agentId: string; issueId: string; wakeReason: string; contextExtras?: Record; invocationSource?: "assignment" | "automation"; scheduledRetryReason?: string | null; }) { const wakeupRequestId = randomUUID(); const runId = randomUUID(); await db.insert(agentWakeupRequests).values({ id: wakeupRequestId, companyId: input.companyId, agentId: input.agentId, source: input.invocationSource ?? "assignment", triggerDetail: "system", reason: input.wakeReason, payload: { issueId: input.issueId }, status: "queued", }); await db.insert(heartbeatRuns).values({ id: runId, companyId: input.companyId, agentId: input.agentId, invocationSource: input.invocationSource ?? "assignment", triggerDetail: "system", status: "queued", wakeupRequestId, scheduledRetryReason: input.scheduledRetryReason ?? null, contextSnapshot: { issueId: input.issueId, wakeReason: input.wakeReason, ...(input.contextExtras ?? {}), }, }); await db .update(agentWakeupRequests) .set({ runId }) .where(eq(agentWakeupRequests.id, wakeupRequestId)); return { runId, wakeupRequestId }; } async function seedContinuationSummary(input: { companyId: string; issueId: string; agentId: string; body: string; }) { const documentId = randomUUID(); const revisionId = randomUUID(); await db.insert(documents).values({ id: documentId, companyId: input.companyId, title: "Continuation Summary", format: "markdown", latestBody: input.body, latestRevisionId: revisionId, latestRevisionNumber: 1, createdByAgentId: input.agentId, updatedByAgentId: input.agentId, }); await db.insert(documentRevisions).values({ id: revisionId, companyId: input.companyId, documentId, revisionNumber: 1, title: "Continuation Summary", format: "markdown", body: input.body, createdByAgentId: input.agentId, }); await db.insert(issueDocuments).values({ companyId: input.companyId, issueId: input.issueId, documentId, key: ISSUE_CONTINUATION_SUMMARY_DOCUMENT_KEY, }); } it("cancels queued runs when the issue assignee changes before the run starts", async () => { const { companyId, agentId } = await seedCompanyAndAgent({ agentName: "OriginalCoder" }); const replacementAgentId = randomUUID(); await db.insert(agents).values({ id: replacementAgentId, companyId, name: "ReplacementCoder", role: "engineer", status: "active", adapterType: "codex_local", adapterConfig: {}, runtimeConfig: { heartbeat: { wakeOnDemand: true, maxConcurrentRuns: 1, }, }, permissions: {}, }); const issueId = randomUUID(); await db.insert(issues).values({ id: issueId, companyId, title: "Reassigned task", status: "in_progress", priority: "high", assigneeAgentId: replacementAgentId, }); const { runId, wakeupRequestId } = await seedQueuedRun({ companyId, agentId, issueId, wakeReason: "issue_assigned", }); await heartbeat.resumeQueuedRuns(); await waitForCondition(async () => { const run = await db .select({ status: heartbeatRuns.status }) .from(heartbeatRuns) .where(eq(heartbeatRuns.id, runId)) .then((rows) => rows[0] ?? null); return run?.status === "cancelled"; }); const [run, wakeup] = await Promise.all([ db .select({ status: heartbeatRuns.status, errorCode: heartbeatRuns.errorCode, resultJson: heartbeatRuns.resultJson, }) .from(heartbeatRuns) .where(eq(heartbeatRuns.id, runId)) .then((rows) => rows[0] ?? null), db .select({ status: agentWakeupRequests.status, error: agentWakeupRequests.error }) .from(agentWakeupRequests) .where(eq(agentWakeupRequests.id, wakeupRequestId)) .then((rows) => rows[0] ?? null), ]); expect(run?.status).toBe("cancelled"); expect(run?.errorCode).toBe("issue_assignee_changed"); expect(run?.resultJson).toMatchObject({ stopReason: "issue_assignee_changed" }); expect(wakeup?.status).toBe("skipped"); expect(wakeup?.error).toContain("assignee changed"); expect(countExecuteCallsForRun(runId)).toBe(0); }); it("cancels queued runs when the issue reaches a terminal status before the run starts", async () => { const { companyId, agentId } = await seedCompanyAndAgent(); const issueId = randomUUID(); await db.insert(issues).values({ id: issueId, companyId, title: "Already-completed task", status: "done", priority: "medium", assigneeAgentId: agentId, }); const { runId, wakeupRequestId } = await seedQueuedRun({ companyId, agentId, issueId, wakeReason: "issue_assigned", }); await heartbeat.resumeQueuedRuns(); await waitForCondition(async () => { const run = await db .select({ status: heartbeatRuns.status }) .from(heartbeatRuns) .where(eq(heartbeatRuns.id, runId)) .then((rows) => rows[0] ?? null); return run?.status === "cancelled"; }); const [run, wakeup] = await Promise.all([ db .select({ status: heartbeatRuns.status, errorCode: heartbeatRuns.errorCode }) .from(heartbeatRuns) .where(eq(heartbeatRuns.id, runId)) .then((rows) => rows[0] ?? null), db .select({ status: agentWakeupRequests.status }) .from(agentWakeupRequests) .where(eq(agentWakeupRequests.id, wakeupRequestId)) .then((rows) => rows[0] ?? null), ]); expect(run?.status).toBe("cancelled"); expect(run?.errorCode).toBe("issue_terminal_status"); expect(wakeup?.status).toBe("skipped"); expect(countExecuteCallsForRun(runId)).toBe(0); }); it("cancels queued max-turn continuations when the issue is no longer in_progress before the run starts", async () => { const { companyId, agentId } = await seedCompanyAndAgent(); const issueId = randomUUID(); await db.insert(issues).values({ id: issueId, companyId, title: "Parked max-turn continuation", status: "blocked", priority: "medium", assigneeAgentId: agentId, }); const { runId, wakeupRequestId } = await seedQueuedRun({ companyId, agentId, issueId, wakeReason: MAX_TURN_CONTINUATION_WAKE_REASON, invocationSource: "automation", scheduledRetryReason: MAX_TURN_CONTINUATION_RETRY_REASON, contextExtras: { retryReason: MAX_TURN_CONTINUATION_RETRY_REASON, }, }); await heartbeat.resumeQueuedRuns(); await waitForCondition(async () => { const run = await db .select({ status: heartbeatRuns.status }) .from(heartbeatRuns) .where(eq(heartbeatRuns.id, runId)) .then((rows) => rows[0] ?? null); return run?.status === "cancelled"; }); const [run, wakeup] = await Promise.all([ db .select({ status: heartbeatRuns.status, errorCode: heartbeatRuns.errorCode, resultJson: heartbeatRuns.resultJson, }) .from(heartbeatRuns) .where(eq(heartbeatRuns.id, runId)) .then((rows) => rows[0] ?? null), db .select({ status: agentWakeupRequests.status, error: agentWakeupRequests.error }) .from(agentWakeupRequests) .where(eq(agentWakeupRequests.id, wakeupRequestId)) .then((rows) => rows[0] ?? null), ]); expect(run?.status).toBe("cancelled"); expect(run?.errorCode).toBe("issue_not_in_progress"); expect(run?.resultJson).toMatchObject({ stopReason: "issue_not_in_progress" }); expect(wakeup?.status).toBe("skipped"); expect(wakeup?.error).toContain("no longer in_progress"); expect(countExecuteCallsForRun(runId)).toBe(0); }); it("cancels queued max-turn continuations when another continuation owns the issue lock", async () => { const { companyId, agentId } = await seedCompanyAndAgent(); const issueId = randomUUID(); const lockOwnerRunId = randomUUID(); await db.insert(heartbeatRuns).values({ id: lockOwnerRunId, companyId, agentId, invocationSource: "automation", triggerDetail: "system", status: "scheduled_retry", scheduledRetryReason: MAX_TURN_CONTINUATION_RETRY_REASON, scheduledRetryAttempt: 1, scheduledRetryAt: new Date("2026-04-20T12:00:00.000Z"), contextSnapshot: { issueId, wakeReason: MAX_TURN_CONTINUATION_WAKE_REASON, retryReason: MAX_TURN_CONTINUATION_RETRY_REASON, }, }); await db.insert(issues).values({ id: issueId, companyId, title: "Duplicate max-turn continuation", status: "in_progress", priority: "medium", assigneeAgentId: agentId, executionRunId: lockOwnerRunId, executionAgentNameKey: "claudecoder", executionLockedAt: new Date("2026-04-20T11:59:00.000Z"), }); const { runId, wakeupRequestId } = await seedQueuedRun({ companyId, agentId, issueId, wakeReason: MAX_TURN_CONTINUATION_WAKE_REASON, invocationSource: "automation", scheduledRetryReason: MAX_TURN_CONTINUATION_RETRY_REASON, contextExtras: { retryReason: MAX_TURN_CONTINUATION_RETRY_REASON, }, }); await heartbeat.resumeQueuedRuns(); await waitForCondition(async () => { const run = await db .select({ status: heartbeatRuns.status }) .from(heartbeatRuns) .where(eq(heartbeatRuns.id, runId)) .then((rows) => rows[0] ?? null); return run?.status === "cancelled"; }); const [run, wakeup, issue] = await Promise.all([ db .select({ status: heartbeatRuns.status, errorCode: heartbeatRuns.errorCode, resultJson: heartbeatRuns.resultJson, }) .from(heartbeatRuns) .where(eq(heartbeatRuns.id, runId)) .then((rows) => rows[0] ?? null), db .select({ status: agentWakeupRequests.status, error: agentWakeupRequests.error }) .from(agentWakeupRequests) .where(eq(agentWakeupRequests.id, wakeupRequestId)) .then((rows) => rows[0] ?? null), db .select({ executionRunId: issues.executionRunId }) .from(issues) .where(eq(issues.id, issueId)) .then((rows) => rows[0] ?? null), ]); expect(run?.status).toBe("cancelled"); expect(run?.errorCode).toBe("issue_execution_lock_changed"); expect(run?.resultJson).toMatchObject({ stopReason: "issue_execution_lock_changed" }); expect(wakeup?.status).toBe("skipped"); expect(wakeup?.error).toContain("execution lock"); expect(issue?.executionRunId).toBe(lockOwnerRunId); expect(countExecuteCallsForRun(runId)).toBe(0); }); it("cancels queued in_review runs when the current participant changes before the run starts", async () => { const { companyId, agentId } = await seedCompanyAndAgent(); const otherAgentId = randomUUID(); await db.insert(agents).values({ id: otherAgentId, companyId, name: "ReviewerAgent", role: "qa", status: "active", adapterType: "codex_local", adapterConfig: {}, runtimeConfig: { heartbeat: { wakeOnDemand: true, maxConcurrentRuns: 1 } }, permissions: {}, }); const issueId = randomUUID(); await db.insert(issues).values({ id: issueId, companyId, title: "In-review task now owned by reviewer", status: "in_review", priority: "medium", assigneeAgentId: agentId, executionState: { status: "pending", currentStageId: randomUUID(), currentStageIndex: 0, currentStageType: "review", currentParticipant: { type: "agent", agentId: otherAgentId, userId: null }, returnAssignee: { type: "agent", agentId, userId: null }, reviewRequest: null, completedStageIds: [], lastDecisionId: null, lastDecisionOutcome: null, }, }); const { runId, wakeupRequestId } = await seedQueuedRun({ companyId, agentId, issueId, wakeReason: "issue_assigned", }); await heartbeat.resumeQueuedRuns(); await waitForCondition(async () => { const run = await db .select({ status: heartbeatRuns.status }) .from(heartbeatRuns) .where(eq(heartbeatRuns.id, runId)) .then((rows) => rows[0] ?? null); return run?.status === "cancelled"; }); const [run, wakeup] = await Promise.all([ db .select({ status: heartbeatRuns.status, errorCode: heartbeatRuns.errorCode, resultJson: heartbeatRuns.resultJson, }) .from(heartbeatRuns) .where(eq(heartbeatRuns.id, runId)) .then((rows) => rows[0] ?? null), db .select({ status: agentWakeupRequests.status, error: agentWakeupRequests.error }) .from(agentWakeupRequests) .where(eq(agentWakeupRequests.id, wakeupRequestId)) .then((rows) => rows[0] ?? null), ]); expect(run?.status).toBe("cancelled"); expect(run?.errorCode).toBe("issue_review_participant_changed"); expect(run?.resultJson).toMatchObject({ stopReason: "issue_review_participant_changed" }); expect(wakeup?.status).toBe("skipped"); expect(wakeup?.error).toContain("in-review participant changed"); expect(countExecuteCallsForRun(runId)).toBe(0); }); it("still runs comment-driven wakes on in_review issues even when the agent is no longer the current participant", async () => { const { companyId, agentId } = await seedCompanyAndAgent(); const otherAgentId = randomUUID(); await db.insert(agents).values({ id: otherAgentId, companyId, name: "ReviewerAgent", role: "qa", status: "active", adapterType: "codex_local", adapterConfig: {}, runtimeConfig: { heartbeat: { wakeOnDemand: true, maxConcurrentRuns: 1 } }, permissions: {}, }); const issueId = randomUUID(); const commentId = randomUUID(); await db.insert(issues).values({ id: issueId, companyId, title: "In-review task with comment feedback", status: "in_review", priority: "medium", assigneeAgentId: agentId, executionState: { status: "pending", currentStageId: randomUUID(), currentStageIndex: 0, currentStageType: "review", currentParticipant: { type: "agent", agentId: otherAgentId, userId: null }, returnAssignee: { type: "agent", agentId, userId: null }, reviewRequest: null, completedStageIds: [], lastDecisionId: null, lastDecisionOutcome: null, }, }); await db.insert(issueComments).values({ id: commentId, companyId, issueId, authorAgentId: otherAgentId, body: "Review feedback comment", }); const { runId } = await seedQueuedRun({ companyId, agentId, issueId, wakeReason: "issue_commented", invocationSource: "automation", contextExtras: { commentId, wakeCommentId: commentId, source: "issue.comment", }, }); await heartbeat.resumeQueuedRuns(); await waitForCondition(async () => { const run = await db .select({ status: heartbeatRuns.status }) .from(heartbeatRuns) .where(eq(heartbeatRuns.id, runId)) .then((rows) => rows[0] ?? null); return run?.status === "succeeded"; }); const run = await db .select({ status: heartbeatRuns.status, errorCode: heartbeatRuns.errorCode }) .from(heartbeatRuns) .where(eq(heartbeatRuns.id, runId)) .then((rows) => rows[0] ?? null); expect(run?.status).toBe("succeeded"); expect(run?.errorCode).toBeNull(); }); it("baseline: runs queued runs when the issue is in_progress with the same assignee", async () => { const { companyId, agentId } = await seedCompanyAndAgent(); const issueId = randomUUID(); await db.insert(issues).values({ id: issueId, companyId, title: "Still actionable", status: "in_progress", priority: "medium", assigneeAgentId: agentId, }); const { runId } = await seedQueuedRun({ companyId, agentId, issueId, wakeReason: "issue_assigned", }); await heartbeat.resumeQueuedRuns(); await waitForCondition(async () => { const run = await db .select({ status: heartbeatRuns.status }) .from(heartbeatRuns) .where(eq(heartbeatRuns.id, runId)) .then((rows) => rows[0] ?? null); return run?.status === "succeeded"; }); const run = await db .select({ status: heartbeatRuns.status, errorCode: heartbeatRuns.errorCode }) .from(heartbeatRuns) .where(eq(heartbeatRuns.id, runId)) .then((rows) => rows[0] ?? null); expect(run?.status).toBe("succeeded"); expect(run?.errorCode).toBeNull(); expect(countExecuteCallsForRun(runId)).toBe(1); }); it("cancels queued continuation recovery when the continuation summary parks executor work for review", async () => { const { companyId, agentId } = await seedCompanyAndAgent(); const issueId = randomUUID(); await db.insert(issues).values({ id: issueId, companyId, title: "Implementation parked for review", status: "in_progress", priority: "medium", assigneeAgentId: agentId, }); await seedContinuationSummary({ companyId, issueId, agentId, body: [ "# Continuation Summary", "", "## Next Action", "", "- Wait for reviewer feedback or approval before continuing executor work.", ].join("\n"), }); const { runId, wakeupRequestId } = await seedQueuedRun({ companyId, agentId, issueId, wakeReason: "issue_continuation_needed", invocationSource: "automation", contextExtras: { retryReason: "issue_continuation_needed", }, }); await heartbeat.resumeQueuedRuns(); await waitForCondition(async () => { const run = await db .select({ status: heartbeatRuns.status }) .from(heartbeatRuns) .where(eq(heartbeatRuns.id, runId)) .then((rows) => rows[0] ?? null); return run?.status === "cancelled"; }); const [run, wakeup] = await Promise.all([ db .select({ status: heartbeatRuns.status, errorCode: heartbeatRuns.errorCode, resultJson: heartbeatRuns.resultJson, }) .from(heartbeatRuns) .where(eq(heartbeatRuns.id, runId)) .then((rows) => rows[0] ?? null), db .select({ status: agentWakeupRequests.status, error: agentWakeupRequests.error }) .from(agentWakeupRequests) .where(eq(agentWakeupRequests.id, wakeupRequestId)) .then((rows) => rows[0] ?? null), ]); expect(run?.status).toBe("cancelled"); expect(run?.errorCode).toBe("issue_continuation_waiting_on_review"); expect(run?.resultJson).toMatchObject({ stopReason: "issue_continuation_waiting_on_review" }); expect(wakeup?.status).toBe("skipped"); expect(wakeup?.error).toContain("continuation summary says the executor should wait"); expect(countExecuteCallsForRun(runId)).toBe(0); }); });