import { randomUUID } from "node:crypto"; import { eq, sql } from "drizzle-orm"; import { afterAll, afterEach, beforeAll, describe, expect, it, vi } from "vitest"; import { activityLog, agents, agentRuntimeState, agentWakeupRequests, companies, companySkills, createDb, documentRevisions, documents, heartbeatRunEvents, heartbeatRuns, issueComments, issueDocuments, issueRelations, issueTreeHolds, issues, } from "@paperclipai/db"; import { getEmbeddedPostgresTestSupport, startEmbeddedPostgresTestDatabase, } from "./helpers/embedded-postgres.js"; import { heartbeatService } from "../services/heartbeat.ts"; import { runningProcesses } from "../adapters/index.ts"; const mockAdapterExecute = vi.hoisted(() => vi.fn(async () => ({ exitCode: 0, signal: null, timedOut: false, errorMessage: null, summary: "Stale-queue invalidation test run.", provider: "test", model: "test-model", })), ); vi.mock("../adapters/index.ts", async () => { const actual = await vi.importActual("../adapters/index.ts"); return { ...actual, getServerAdapter: vi.fn(() => ({ supportsLocalAgentJwt: false, execute: mockAdapterExecute, })), }; }); const embeddedPostgresSupport = await getEmbeddedPostgresTestSupport(); const describeEmbeddedPostgres = embeddedPostgresSupport.supported ? describe : describe.skip; if (!embeddedPostgresSupport.supported) { console.warn( `Skipping embedded Postgres heartbeat stale-queue invalidation tests on this host: ${embeddedPostgresSupport.reason ?? "unsupported environment"}`, ); } async function ensureIssueRelationsTable(db: ReturnType) { await db.execute(sql.raw(` CREATE TABLE IF NOT EXISTS "issue_relations" ( "id" uuid PRIMARY KEY DEFAULT gen_random_uuid(), "company_id" uuid NOT NULL, "issue_id" uuid NOT NULL, "related_issue_id" uuid NOT NULL, "type" text NOT NULL, "created_by_agent_id" uuid, "created_by_user_id" text, "created_at" timestamptz NOT NULL DEFAULT now(), "updated_at" timestamptz NOT NULL DEFAULT now() ); `)); } async function waitForCondition(fn: () => Promise, timeoutMs = 3_000) { const deadline = Date.now() + timeoutMs; while (Date.now() < deadline) { if (await fn()) return true; await new Promise((resolve) => setTimeout(resolve, 50)); } return fn(); } type SeedOptions = { agentName?: string; agentRole?: string; maxConcurrentRuns?: number; }; type SeedResult = { companyId: string; agentId: string; }; describeEmbeddedPostgres("heartbeat stale queued-run invalidation", () => { let db!: ReturnType; let heartbeat!: ReturnType; let tempDb: Awaited> | null = null; beforeAll(async () => { tempDb = await startEmbeddedPostgresTestDatabase("paperclip-heartbeat-stale-queue-"); db = createDb(tempDb.connectionString); heartbeat = heartbeatService(db); await ensureIssueRelationsTable(db); }, 20_000); afterEach(async () => { mockAdapterExecute.mockReset(); mockAdapterExecute.mockImplementation(async () => ({ exitCode: 0, signal: null, timedOut: false, errorMessage: null, summary: "Stale-queue invalidation test run.", provider: "test", model: "test-model", })); runningProcesses.clear(); let idlePolls = 0; for (let attempt = 0; attempt < 100; attempt += 1) { const runs = await db .select({ status: heartbeatRuns.status }) .from(heartbeatRuns); const hasActiveRun = runs.some((run) => run.status === "queued" || run.status === "running"); if (!hasActiveRun) { idlePolls += 1; if (idlePolls >= 3) break; } else { idlePolls = 0; } await new Promise((resolve) => setTimeout(resolve, 50)); } await new Promise((resolve) => setTimeout(resolve, 50)); await db.delete(companySkills); await db.delete(issueComments); await db.delete(issueDocuments); await db.delete(documentRevisions); await db.delete(documents); await db.delete(issueRelations); await db.delete(issueTreeHolds); await db.delete(issues); await db.delete(heartbeatRunEvents); await db.delete(activityLog); await db.delete(heartbeatRuns); await db.delete(agentWakeupRequests); await db.delete(agentRuntimeState); await db.delete(agents); await db.delete(companies); }); afterAll(async () => { await tempDb?.cleanup(); }); async function seedCompanyAndAgent(opts: SeedOptions = {}): Promise { const companyId = randomUUID(); const agentId = randomUUID(); await db.insert(companies).values({ id: companyId, name: "Paperclip", issuePrefix: `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`, requireBoardApprovalForNewAgents: false, }); await db.insert(agents).values({ id: agentId, companyId, name: opts.agentName ?? "ClaudeCoder", role: opts.agentRole ?? "engineer", status: "active", adapterType: "codex_local", adapterConfig: {}, runtimeConfig: { heartbeat: { wakeOnDemand: true, maxConcurrentRuns: opts.maxConcurrentRuns ?? 1, }, }, permissions: {}, }); return { companyId, agentId }; } async function seedQueuedRun(input: { companyId: string; agentId: string; issueId: string; wakeReason: string; contextExtras?: Record; invocationSource?: "assignment" | "automation"; }) { const wakeupRequestId = randomUUID(); const runId = randomUUID(); await db.insert(agentWakeupRequests).values({ id: wakeupRequestId, companyId: input.companyId, agentId: input.agentId, source: input.invocationSource ?? "assignment", triggerDetail: "system", reason: input.wakeReason, payload: { issueId: input.issueId }, status: "queued", }); await db.insert(heartbeatRuns).values({ id: runId, companyId: input.companyId, agentId: input.agentId, invocationSource: input.invocationSource ?? "assignment", triggerDetail: "system", status: "queued", wakeupRequestId, contextSnapshot: { issueId: input.issueId, wakeReason: input.wakeReason, ...(input.contextExtras ?? {}), }, }); await db .update(agentWakeupRequests) .set({ runId }) .where(eq(agentWakeupRequests.id, wakeupRequestId)); return { runId, wakeupRequestId }; } it("cancels queued runs when the issue assignee changes before the run starts", async () => { const { companyId, agentId } = await seedCompanyAndAgent({ agentName: "OriginalCoder" }); const replacementAgentId = randomUUID(); await db.insert(agents).values({ id: replacementAgentId, companyId, name: "ReplacementCoder", role: "engineer", status: "active", adapterType: "codex_local", adapterConfig: {}, runtimeConfig: { heartbeat: { wakeOnDemand: true, maxConcurrentRuns: 1, }, }, permissions: {}, }); const issueId = randomUUID(); await db.insert(issues).values({ id: issueId, companyId, title: "Reassigned task", status: "in_progress", priority: "high", assigneeAgentId: replacementAgentId, }); const { runId, wakeupRequestId } = await seedQueuedRun({ companyId, agentId, issueId, wakeReason: "issue_assigned", }); await heartbeat.resumeQueuedRuns(); await waitForCondition(async () => { const run = await db .select({ status: heartbeatRuns.status }) .from(heartbeatRuns) .where(eq(heartbeatRuns.id, runId)) .then((rows) => rows[0] ?? null); return run?.status === "cancelled"; }); const [run, wakeup] = await Promise.all([ db .select({ status: heartbeatRuns.status, errorCode: heartbeatRuns.errorCode, resultJson: heartbeatRuns.resultJson, }) .from(heartbeatRuns) .where(eq(heartbeatRuns.id, runId)) .then((rows) => rows[0] ?? null), db .select({ status: agentWakeupRequests.status, error: agentWakeupRequests.error }) .from(agentWakeupRequests) .where(eq(agentWakeupRequests.id, wakeupRequestId)) .then((rows) => rows[0] ?? null), ]); expect(run?.status).toBe("cancelled"); expect(run?.errorCode).toBe("issue_assignee_changed"); expect(run?.resultJson).toMatchObject({ stopReason: "issue_assignee_changed" }); expect(wakeup?.status).toBe("skipped"); expect(wakeup?.error).toContain("assignee changed"); expect(mockAdapterExecute).not.toHaveBeenCalled(); }); it("cancels queued runs when the issue reaches a terminal status before the run starts", async () => { const { companyId, agentId } = await seedCompanyAndAgent(); const issueId = randomUUID(); await db.insert(issues).values({ id: issueId, companyId, title: "Already-completed task", status: "done", priority: "medium", assigneeAgentId: agentId, }); const { runId, wakeupRequestId } = await seedQueuedRun({ companyId, agentId, issueId, wakeReason: "issue_assigned", }); await heartbeat.resumeQueuedRuns(); await waitForCondition(async () => { const run = await db .select({ status: heartbeatRuns.status }) .from(heartbeatRuns) .where(eq(heartbeatRuns.id, runId)) .then((rows) => rows[0] ?? null); return run?.status === "cancelled"; }); const [run, wakeup] = await Promise.all([ db .select({ status: heartbeatRuns.status, errorCode: heartbeatRuns.errorCode }) .from(heartbeatRuns) .where(eq(heartbeatRuns.id, runId)) .then((rows) => rows[0] ?? null), db .select({ status: agentWakeupRequests.status }) .from(agentWakeupRequests) .where(eq(agentWakeupRequests.id, wakeupRequestId)) .then((rows) => rows[0] ?? null), ]); expect(run?.status).toBe("cancelled"); expect(run?.errorCode).toBe("issue_terminal_status"); expect(wakeup?.status).toBe("skipped"); expect(mockAdapterExecute).not.toHaveBeenCalled(); }); it("cancels queued in_review runs when the current participant changes before the run starts", async () => { const { companyId, agentId } = await seedCompanyAndAgent(); const otherAgentId = randomUUID(); await db.insert(agents).values({ id: otherAgentId, companyId, name: "ReviewerAgent", role: "qa", status: "active", adapterType: "codex_local", adapterConfig: {}, runtimeConfig: { heartbeat: { wakeOnDemand: true, maxConcurrentRuns: 1 } }, permissions: {}, }); const issueId = randomUUID(); await db.insert(issues).values({ id: issueId, companyId, title: "In-review task now owned by reviewer", status: "in_review", priority: "medium", assigneeAgentId: agentId, executionState: { status: "pending", currentStageId: randomUUID(), currentStageIndex: 0, currentStageType: "review", currentParticipant: { type: "agent", agentId: otherAgentId, userId: null }, returnAssignee: { type: "agent", agentId, userId: null }, reviewRequest: null, completedStageIds: [], lastDecisionId: null, lastDecisionOutcome: null, }, }); const { runId, wakeupRequestId } = await seedQueuedRun({ companyId, agentId, issueId, wakeReason: "issue_assigned", }); await heartbeat.resumeQueuedRuns(); await waitForCondition(async () => { const run = await db .select({ status: heartbeatRuns.status }) .from(heartbeatRuns) .where(eq(heartbeatRuns.id, runId)) .then((rows) => rows[0] ?? null); return run?.status === "cancelled"; }); const [run, wakeup] = await Promise.all([ db .select({ status: heartbeatRuns.status, errorCode: heartbeatRuns.errorCode, resultJson: heartbeatRuns.resultJson, }) .from(heartbeatRuns) .where(eq(heartbeatRuns.id, runId)) .then((rows) => rows[0] ?? null), db .select({ status: agentWakeupRequests.status, error: agentWakeupRequests.error }) .from(agentWakeupRequests) .where(eq(agentWakeupRequests.id, wakeupRequestId)) .then((rows) => rows[0] ?? null), ]); expect(run?.status).toBe("cancelled"); expect(run?.errorCode).toBe("issue_review_participant_changed"); expect(run?.resultJson).toMatchObject({ stopReason: "issue_review_participant_changed" }); expect(wakeup?.status).toBe("skipped"); expect(wakeup?.error).toContain("in-review participant changed"); expect(mockAdapterExecute).not.toHaveBeenCalled(); }); it("still runs comment-driven wakes on in_review issues even when the agent is no longer the current participant", async () => { const { companyId, agentId } = await seedCompanyAndAgent(); const otherAgentId = randomUUID(); await db.insert(agents).values({ id: otherAgentId, companyId, name: "ReviewerAgent", role: "qa", status: "active", adapterType: "codex_local", adapterConfig: {}, runtimeConfig: { heartbeat: { wakeOnDemand: true, maxConcurrentRuns: 1 } }, permissions: {}, }); const issueId = randomUUID(); const commentId = randomUUID(); await db.insert(issues).values({ id: issueId, companyId, title: "In-review task with comment feedback", status: "in_review", priority: "medium", assigneeAgentId: agentId, executionState: { status: "pending", currentStageId: randomUUID(), currentStageIndex: 0, currentStageType: "review", currentParticipant: { type: "agent", agentId: otherAgentId, userId: null }, returnAssignee: { type: "agent", agentId, userId: null }, reviewRequest: null, completedStageIds: [], lastDecisionId: null, lastDecisionOutcome: null, }, }); await db.insert(issueComments).values({ id: commentId, companyId, issueId, authorAgentId: otherAgentId, body: "Review feedback comment", }); const { runId } = await seedQueuedRun({ companyId, agentId, issueId, wakeReason: "issue_commented", invocationSource: "automation", contextExtras: { commentId, wakeCommentId: commentId, source: "issue.comment", }, }); await heartbeat.resumeQueuedRuns(); await waitForCondition(async () => { const run = await db .select({ status: heartbeatRuns.status }) .from(heartbeatRuns) .where(eq(heartbeatRuns.id, runId)) .then((rows) => rows[0] ?? null); return run?.status === "succeeded"; }); const run = await db .select({ status: heartbeatRuns.status, errorCode: heartbeatRuns.errorCode }) .from(heartbeatRuns) .where(eq(heartbeatRuns.id, runId)) .then((rows) => rows[0] ?? null); expect(run?.status).toBe("succeeded"); expect(run?.errorCode).toBeNull(); }); it("baseline: runs queued runs when the issue is in_progress with the same assignee", async () => { const { companyId, agentId } = await seedCompanyAndAgent(); const issueId = randomUUID(); await db.insert(issues).values({ id: issueId, companyId, title: "Still actionable", status: "in_progress", priority: "medium", assigneeAgentId: agentId, }); const { runId } = await seedQueuedRun({ companyId, agentId, issueId, wakeReason: "issue_assigned", }); await heartbeat.resumeQueuedRuns(); await waitForCondition(async () => { const run = await db .select({ status: heartbeatRuns.status }) .from(heartbeatRuns) .where(eq(heartbeatRuns.id, runId)) .then((rows) => rows[0] ?? null); return run?.status === "succeeded"; }); const run = await db .select({ status: heartbeatRuns.status, errorCode: heartbeatRuns.errorCode }) .from(heartbeatRuns) .where(eq(heartbeatRuns.id, runId)) .then((rows) => rows[0] ?? null); expect(run?.status).toBe("succeeded"); expect(run?.errorCode).toBeNull(); expect(mockAdapterExecute).toHaveBeenCalledTimes(1); }); });