import { randomUUID } from "node:crypto"; import { and, eq, sql } from "drizzle-orm"; import { afterAll, afterEach, beforeAll, describe, expect, it, vi } from "vitest"; import { activityLog, agents, agentRuntimeState, agentWakeupRequests, companySkills, companies, createDb, documentRevisions, documents, heartbeatRunEvents, heartbeatRuns, issueComments, issueDocuments, issueRelations, issueTreeHolds, issues, } from "@paperclipai/db"; import { getEmbeddedPostgresTestSupport, startEmbeddedPostgresTestDatabase, } from "./helpers/embedded-postgres.js"; import { heartbeatService } from "../services/heartbeat.ts"; import { runningProcesses } from "../adapters/index.ts"; const mockAdapterExecute = vi.hoisted(() => vi.fn(async () => ({ exitCode: 0, signal: null, timedOut: false, errorMessage: null, summary: "Dependency-aware heartbeat test run.", provider: "test", model: "test-model", })), ); vi.mock("../adapters/index.ts", async () => { const actual = await vi.importActual("../adapters/index.ts"); return { ...actual, getServerAdapter: vi.fn(() => ({ supportsLocalAgentJwt: false, execute: mockAdapterExecute, })), }; }); const embeddedPostgresSupport = await getEmbeddedPostgresTestSupport(); const describeEmbeddedPostgres = embeddedPostgresSupport.supported ? describe : describe.skip; if (!embeddedPostgresSupport.supported) { console.warn( `Skipping embedded Postgres heartbeat dependency scheduling tests on this host: ${embeddedPostgresSupport.reason ?? "unsupported environment"}`, ); } async function ensureIssueRelationsTable(db: ReturnType) { await db.execute(sql.raw(` CREATE TABLE IF NOT EXISTS "issue_relations" ( "id" uuid PRIMARY KEY DEFAULT gen_random_uuid(), "company_id" uuid NOT NULL, "issue_id" uuid NOT NULL, "related_issue_id" uuid NOT NULL, "type" text NOT NULL, "created_by_agent_id" uuid, "created_by_user_id" text, "created_at" timestamptz NOT NULL DEFAULT now(), "updated_at" timestamptz NOT NULL DEFAULT now() ); `)); } async function waitForCondition(fn: () => Promise, timeoutMs = 3_000) { const deadline = Date.now() + timeoutMs; while (Date.now() < deadline) { if (await fn()) return true; await new Promise((resolve) => setTimeout(resolve, 50)); } return fn(); } describeEmbeddedPostgres("heartbeat dependency-aware queued run selection", () => { let db!: ReturnType; let heartbeat!: ReturnType; let tempDb: Awaited> | null = null; beforeAll(async () => { tempDb = await startEmbeddedPostgresTestDatabase("paperclip-heartbeat-dependency-scheduling-"); db = createDb(tempDb.connectionString); heartbeat = heartbeatService(db); await ensureIssueRelationsTable(db); }, 20_000); afterEach(async () => { mockAdapterExecute.mockReset(); mockAdapterExecute.mockImplementation(async () => ({ exitCode: 0, signal: null, timedOut: false, errorMessage: null, summary: "Dependency-aware heartbeat test run.", provider: "test", model: "test-model", })); runningProcesses.clear(); let idlePolls = 0; for (let attempt = 0; attempt < 100; attempt += 1) { const runs = await db .select({ status: heartbeatRuns.status }) .from(heartbeatRuns); const hasActiveRun = runs.some((run) => run.status === "queued" || run.status === "running"); if (!hasActiveRun) { idlePolls += 1; if (idlePolls >= 3) break; } else { idlePolls = 0; } await new Promise((resolve) => setTimeout(resolve, 50)); } await new Promise((resolve) => setTimeout(resolve, 50)); await db.delete(activityLog); await db.delete(companySkills); await db.delete(issueComments); await db.delete(issueDocuments); await db.delete(documentRevisions); await db.delete(documents); await db.delete(issueRelations); await db.delete(issueTreeHolds); await db.delete(issues); await db.delete(heartbeatRunEvents); await db.delete(activityLog); await db.delete(heartbeatRuns); await db.delete(agentWakeupRequests); await db.delete(agentRuntimeState); await db.delete(agents); await db.delete(companies); }); afterAll(async () => { await tempDb?.cleanup(); }); it("keeps blocked descendants idle until their blockers resolve", async () => { const companyId = randomUUID(); const agentId = randomUUID(); const blockerId = randomUUID(); const blockedIssueId = randomUUID(); const readyIssueId = randomUUID(); await db.insert(companies).values({ id: companyId, name: "Paperclip", issuePrefix: `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`, requireBoardApprovalForNewAgents: false, }); await db.insert(agents).values({ id: agentId, companyId, name: "CodexCoder", role: "engineer", status: "active", adapterType: "codex_local", adapterConfig: {}, runtimeConfig: { heartbeat: { wakeOnDemand: true, maxConcurrentRuns: 1, }, }, permissions: {}, }); await db.insert(issues).values([ { id: blockerId, companyId, title: "Mission 0", status: "todo", priority: "high", }, { id: blockedIssueId, companyId, title: "Mission 2", status: "todo", priority: "medium", assigneeAgentId: agentId, }, { id: readyIssueId, companyId, title: "Mission 1", status: "todo", priority: "critical", assigneeAgentId: agentId, }, ]); await db.insert(issueRelations).values({ companyId, issueId: blockerId, relatedIssueId: blockedIssueId, type: "blocks", }); const blockedWake = await heartbeat.wakeup(agentId, { source: "assignment", triggerDetail: "system", reason: "issue_assigned", payload: { issueId: blockedIssueId }, contextSnapshot: { issueId: blockedIssueId, wakeReason: "issue_assigned" }, }); expect(blockedWake).toBeNull(); const blockedWakeRequest = await waitForCondition(async () => { const wakeup = await db .select({ status: agentWakeupRequests.status, reason: agentWakeupRequests.reason, }) .from(agentWakeupRequests) .where( and( eq(agentWakeupRequests.agentId, agentId), sql`${agentWakeupRequests.payload} ->> 'issueId' = ${blockedIssueId}`, ), ) .orderBy(agentWakeupRequests.requestedAt) .then((rows) => rows[0] ?? null); return Boolean( wakeup && wakeup.status === "skipped" && wakeup.reason === "issue_dependencies_blocked", ); }); expect(blockedWakeRequest).toBe(true); const blockedRunsBeforeResolution = await db .select({ count: sql`count(*)::int` }) .from(heartbeatRuns) .where(sql`${heartbeatRuns.contextSnapshot} ->> 'issueId' = ${blockedIssueId}`) .then((rows) => rows[0]?.count ?? 0); expect(blockedRunsBeforeResolution).toBe(0); const interactionWake = await heartbeat.wakeup(agentId, { source: "automation", triggerDetail: "system", reason: "issue_commented", payload: { issueId: blockedIssueId, commentId: randomUUID() }, contextSnapshot: { issueId: blockedIssueId, wakeReason: "issue_commented", }, }); expect(interactionWake).not.toBeNull(); await waitForCondition(async () => { const run = await db .select({ status: heartbeatRuns.status }) .from(heartbeatRuns) .where(eq(heartbeatRuns.id, interactionWake!.id)) .then((rows) => rows[0] ?? null); return run?.status === "succeeded"; }); const interactionRun = await db .select({ status: heartbeatRuns.status, contextSnapshot: heartbeatRuns.contextSnapshot, }) .from(heartbeatRuns) .where(eq(heartbeatRuns.id, interactionWake!.id)) .then((rows) => rows[0] ?? null); expect(interactionRun?.status).toBe("succeeded"); expect(interactionRun?.contextSnapshot).toMatchObject({ dependencyBlockedInteraction: true, unresolvedBlockerIssueIds: [blockerId], }); const readyWake = await heartbeat.wakeup(agentId, { source: "assignment", triggerDetail: "system", reason: "issue_assigned", payload: { issueId: readyIssueId }, contextSnapshot: { issueId: readyIssueId, wakeReason: "issue_assigned" }, }); expect(readyWake).not.toBeNull(); await waitForCondition(async () => { const run = await db .select({ status: heartbeatRuns.status }) .from(heartbeatRuns) .where(eq(heartbeatRuns.id, readyWake!.id)) .then((rows) => rows[0] ?? null); return run?.status === "succeeded"; }); const readyRun = await db .select() .from(heartbeatRuns) .where(eq(heartbeatRuns.id, readyWake!.id)) .then((rows) => rows[0] ?? null); expect(readyRun?.status).toBe("succeeded"); await db .update(issues) .set({ status: "done", updatedAt: new Date() }) .where(eq(issues.id, blockerId)); const promotedWake = await heartbeat.wakeup(agentId, { source: "automation", triggerDetail: "system", reason: "issue_blockers_resolved", payload: { issueId: blockedIssueId, resolvedBlockerIssueId: blockerId }, contextSnapshot: { issueId: blockedIssueId, wakeReason: "issue_blockers_resolved", resolvedBlockerIssueId: blockerId, }, }); expect(promotedWake).not.toBeNull(); await waitForCondition(async () => { const run = await db .select({ status: heartbeatRuns.status }) .from(heartbeatRuns) .where(eq(heartbeatRuns.id, promotedWake!.id)) .then((rows) => rows[0] ?? null); return run?.status === "succeeded"; }); const promotedBlockedRun = await db .select({ id: heartbeatRuns.id, status: heartbeatRuns.status, }) .from(heartbeatRuns) .where(eq(heartbeatRuns.id, promotedWake!.id)) .then((rows) => rows[0] ?? null); const blockedWakeRequestCount = await db .select({ count: sql`count(*)::int` }) .from(agentWakeupRequests) .where( and( eq(agentWakeupRequests.agentId, agentId), sql`${agentWakeupRequests.payload} ->> 'issueId' = ${blockedIssueId}`, ), ) .then((rows) => rows[0]?.count ?? 0); expect(promotedBlockedRun?.status).toBe("succeeded"); expect(blockedWakeRequestCount).toBeGreaterThanOrEqual(2); }); it("honors maxConcurrentRuns 1 by leaving a second assignment wake queued", async () => { const companyId = randomUUID(); const agentId = randomUUID(); const firstIssueId = randomUUID(); const secondIssueId = randomUUID(); let finishFirstRun!: () => void; const firstRunFinished = new Promise((resolve) => { finishFirstRun = resolve; }); mockAdapterExecute.mockImplementationOnce(async () => { await firstRunFinished; return { exitCode: 0, signal: null, timedOut: false, errorMessage: null, summary: "First assignment run completed.", provider: "test", model: "test-model", }; }); await db.insert(companies).values({ id: companyId, name: "Paperclip", issuePrefix: `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`, requireBoardApprovalForNewAgents: false, }); await db.insert(agents).values({ id: agentId, companyId, name: "CodexCoder", role: "engineer", status: "active", adapterType: "codex_local", adapterConfig: {}, runtimeConfig: { heartbeat: { wakeOnDemand: true, maxConcurrentRuns: 1, }, }, permissions: {}, }); await db.insert(issues).values([ { id: firstIssueId, companyId, title: "First assignment", status: "todo", priority: "high", assigneeAgentId: agentId, }, { id: secondIssueId, companyId, title: "Second assignment", status: "todo", priority: "high", assigneeAgentId: agentId, }, ]); try { const firstWake = await heartbeat.wakeup(agentId, { source: "assignment", triggerDetail: "system", reason: "issue_assigned", payload: { issueId: firstIssueId }, contextSnapshot: { issueId: firstIssueId, wakeReason: "issue_assigned" }, }); expect(firstWake).not.toBeNull(); const firstRunStarted = await waitForCondition(async () => { const run = await db .select({ status: heartbeatRuns.status }) .from(heartbeatRuns) .where(eq(heartbeatRuns.id, firstWake!.id)) .then((rows) => rows[0] ?? null); return run?.status === "running"; }); expect(firstRunStarted).toBe(true); const firstAdapterStarted = await waitForCondition(async () => mockAdapterExecute.mock.calls.length === 1); expect(firstAdapterStarted).toBe(true); const secondWake = await heartbeat.wakeup(agentId, { source: "assignment", triggerDetail: "system", reason: "issue_assigned", payload: { issueId: secondIssueId }, contextSnapshot: { issueId: secondIssueId, wakeReason: "issue_assigned" }, }); expect(secondWake).not.toBeNull(); const secondRunWhileFirstRunning = await db .select({ status: heartbeatRuns.status }) .from(heartbeatRuns) .where(eq(heartbeatRuns.id, secondWake!.id)) .then((rows) => rows[0] ?? null); expect(secondRunWhileFirstRunning?.status).toBe("queued"); expect(mockAdapterExecute).toHaveBeenCalledTimes(1); finishFirstRun(); const secondRunSucceeded = await waitForCondition(async () => { const run = await db .select({ status: heartbeatRuns.status }) .from(heartbeatRuns) .where(eq(heartbeatRuns.id, secondWake!.id)) .then((rows) => rows[0] ?? null); return run?.status === "succeeded"; }); expect(secondRunSucceeded).toBe(true); expect(mockAdapterExecute).toHaveBeenCalledTimes(2); } finally { finishFirstRun(); } }); it("cancels stale queued runs when issue blockers are still unresolved", async () => { const companyId = randomUUID(); const agentId = randomUUID(); const blockerId = randomUUID(); const blockedIssueId = randomUUID(); const readyIssueId = randomUUID(); const blockedWakeupRequestId = randomUUID(); const readyWakeupRequestId = randomUUID(); const blockedRunId = randomUUID(); const readyRunId = randomUUID(); await db.insert(companies).values({ id: companyId, name: "Paperclip", issuePrefix: `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`, requireBoardApprovalForNewAgents: false, }); await db.insert(agents).values({ id: agentId, companyId, name: "QAChecker", role: "qa", status: "active", adapterType: "codex_local", adapterConfig: {}, runtimeConfig: { heartbeat: { wakeOnDemand: true, maxConcurrentRuns: 2, }, }, permissions: {}, }); await db.insert(issues).values([ { id: blockerId, companyId, title: "Security review", status: "blocked", priority: "high", }, { id: blockedIssueId, companyId, title: "QA validation", status: "blocked", priority: "medium", assigneeAgentId: agentId, }, { id: readyIssueId, companyId, title: "Ready QA task", status: "todo", priority: "low", assigneeAgentId: agentId, }, ]); await db.insert(issueRelations).values({ companyId, issueId: blockerId, relatedIssueId: blockedIssueId, type: "blocks", }); await db.insert(agentWakeupRequests).values([ { id: blockedWakeupRequestId, companyId, agentId, source: "automation", triggerDetail: "system", reason: "transient_failure_retry", payload: { issueId: blockedIssueId }, status: "queued", }, { id: readyWakeupRequestId, companyId, agentId, source: "assignment", triggerDetail: "system", reason: "issue_assigned", payload: { issueId: readyIssueId }, status: "queued", }, ]); await db.insert(heartbeatRuns).values([ { id: blockedRunId, companyId, agentId, invocationSource: "automation", triggerDetail: "system", status: "queued", wakeupRequestId: blockedWakeupRequestId, contextSnapshot: { issueId: blockedIssueId, wakeReason: "transient_failure_retry", }, }, { id: readyRunId, companyId, agentId, invocationSource: "assignment", triggerDetail: "system", status: "queued", wakeupRequestId: readyWakeupRequestId, contextSnapshot: { issueId: readyIssueId, wakeReason: "issue_assigned", }, }, ]); await db .update(agentWakeupRequests) .set({ runId: blockedRunId }) .where(eq(agentWakeupRequests.id, blockedWakeupRequestId)); await db .update(agentWakeupRequests) .set({ runId: readyRunId }) .where(eq(agentWakeupRequests.id, readyWakeupRequestId)); await db .update(issues) .set({ executionRunId: blockedRunId, executionAgentNameKey: "qa-checker", executionLockedAt: new Date(), }) .where(eq(issues.id, blockedIssueId)); await heartbeat.resumeQueuedRuns(); await waitForCondition(async () => { const run = await db .select({ status: heartbeatRuns.status }) .from(heartbeatRuns) .where(eq(heartbeatRuns.id, readyRunId)) .then((rows) => rows[0] ?? null); return run?.status === "succeeded"; }); const [blockedRun, blockedWakeup, blockedIssue, readyRun] = await Promise.all([ db .select({ status: heartbeatRuns.status, errorCode: heartbeatRuns.errorCode, finishedAt: heartbeatRuns.finishedAt, resultJson: heartbeatRuns.resultJson, }) .from(heartbeatRuns) .where(eq(heartbeatRuns.id, blockedRunId)) .then((rows) => rows[0] ?? null), db .select({ status: agentWakeupRequests.status, error: agentWakeupRequests.error, }) .from(agentWakeupRequests) .where(eq(agentWakeupRequests.id, blockedWakeupRequestId)) .then((rows) => rows[0] ?? null), db .select({ executionRunId: issues.executionRunId, executionAgentNameKey: issues.executionAgentNameKey, executionLockedAt: issues.executionLockedAt, }) .from(issues) .where(eq(issues.id, blockedIssueId)) .then((rows) => rows[0] ?? null), db .select({ status: heartbeatRuns.status }) .from(heartbeatRuns) .where(eq(heartbeatRuns.id, readyRunId)) .then((rows) => rows[0] ?? null), ]); expect(blockedRun?.status).toBe("cancelled"); expect(blockedRun?.errorCode).toBe("issue_dependencies_blocked"); expect(blockedRun?.finishedAt).toBeTruthy(); expect(blockedRun?.resultJson).toMatchObject({ stopReason: "issue_dependencies_blocked" }); expect(blockedWakeup?.status).toBe("skipped"); expect(blockedWakeup?.error).toContain("dependencies are still blocked"); expect(blockedIssue).toMatchObject({ executionRunId: null, executionAgentNameKey: null, executionLockedAt: null, }); expect(readyRun?.status).toBe("succeeded"); expect(mockAdapterExecute).toHaveBeenCalledTimes(1); }); it("suppresses normal wakeups while allowing comment interaction wakes under a pause hold", async () => { const companyId = randomUUID(); const agentId = randomUUID(); const rootIssueId = randomUUID(); const issueChain = Array.from({ length: 17 }, () => randomUUID()); const deepDescendantIssueId = issueChain.at(-1)!; await db.insert(companies).values({ id: companyId, name: "Paperclip", issuePrefix: `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`, requireBoardApprovalForNewAgents: false, }); await db.insert(agents).values({ id: agentId, companyId, name: "SecurityEngineer", role: "engineer", status: "active", adapterType: "codex_local", adapterConfig: {}, runtimeConfig: { heartbeat: { wakeOnDemand: true, maxConcurrentRuns: 1, }, }, permissions: {}, }); await db.insert(issues).values([ { id: rootIssueId, companyId, title: "Paused root", status: "todo", priority: "medium", assigneeAgentId: agentId, }, ...issueChain.map((issueId, index) => ({ id: issueId, companyId, parentId: index === 0 ? rootIssueId : issueChain[index - 1], title: `Paused desc ${index + 1}`, status: "todo", priority: "medium", assigneeAgentId: agentId, })), ]); const [hold] = await db .insert(issueTreeHolds) .values({ companyId, rootIssueId, mode: "pause", status: "active", reason: "security test hold", releasePolicy: { strategy: "manual" }, }) .returning(); const blockedWake = await heartbeat.wakeup(agentId, { source: "automation", triggerDetail: "system", reason: "issue_blockers_resolved", payload: { issueId: deepDescendantIssueId }, contextSnapshot: { issueId: deepDescendantIssueId, wakeReason: "issue_blockers_resolved" }, }); expect(blockedWake).toBeNull(); const skippedWake = await db .select({ status: agentWakeupRequests.status, reason: agentWakeupRequests.reason, }) .from(agentWakeupRequests) .where(sql`${agentWakeupRequests.payload} ->> 'issueId' = ${deepDescendantIssueId}`) .then((rows) => rows[0] ?? null); expect(skippedWake).toMatchObject({ status: "skipped", reason: "issue_tree_hold_active" }); const childCommentId = randomUUID(); await db.insert(issueComments).values({ id: childCommentId, companyId, issueId: deepDescendantIssueId, authorUserId: "board-user", body: "Please respond while this hold is active.", }); const forgedChildCommentWake = await heartbeat.wakeup(agentId, { source: "on_demand", triggerDetail: "manual", reason: "issue_commented", payload: { issueId: deepDescendantIssueId, commentId: childCommentId }, requestedByActorType: "agent", requestedByActorId: agentId, }); expect(forgedChildCommentWake).toBeNull(); const childCommentWake = await heartbeat.wakeup(agentId, { source: "automation", triggerDetail: "system", reason: "issue_commented", payload: { issueId: deepDescendantIssueId, commentId: childCommentId }, requestedByActorType: "user", requestedByActorId: "board-user", contextSnapshot: { issueId: deepDescendantIssueId, commentId: childCommentId, wakeCommentId: childCommentId, wakeReason: "issue_commented", source: "issue.comment", }, }); expect(childCommentWake).not.toBeNull(); const childRun = await db .select({ contextSnapshot: heartbeatRuns.contextSnapshot }) .from(heartbeatRuns) .where(eq(heartbeatRuns.id, childCommentWake!.id)) .then((rows) => rows[0] ?? null); expect(childRun?.contextSnapshot).toMatchObject({ treeHoldInteraction: true, activeTreeHold: { holdId: hold.id, rootIssueId, mode: "pause", interaction: true, }, }); }); it("allows comment interaction wakes when a legacy hold has a full_pause note", async () => { const companyId = randomUUID(); const agentId = randomUUID(); const rootIssueId = randomUUID(); await db.insert(companies).values({ id: companyId, name: "Paperclip", issuePrefix: `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`, requireBoardApprovalForNewAgents: false, }); await db.insert(agents).values({ id: agentId, companyId, name: "SecurityEngineer", role: "engineer", status: "active", adapterType: "codex_local", adapterConfig: {}, runtimeConfig: { heartbeat: { wakeOnDemand: true, maxConcurrentRuns: 1, }, }, permissions: {}, }); await db.insert(issues).values({ id: rootIssueId, companyId, title: "Paused root", status: "todo", priority: "medium", assigneeAgentId: agentId, }); await db.insert(issueTreeHolds).values({ companyId, rootIssueId, mode: "pause", status: "active", reason: "full pause", releasePolicy: { strategy: "manual", note: "full_pause" }, }); const rootCommentId = randomUUID(); await db.insert(issueComments).values({ id: rootCommentId, companyId, issueId: rootIssueId, authorUserId: "board-user", body: "Please respond while this hold is active.", }); const rootCommentWake = await heartbeat.wakeup(agentId, { source: "automation", triggerDetail: "system", reason: "issue_commented", payload: { issueId: rootIssueId, commentId: rootCommentId }, requestedByActorType: "user", requestedByActorId: "board-user", contextSnapshot: { issueId: rootIssueId, commentId: rootCommentId, wakeCommentId: rootCommentId, wakeReason: "issue_commented", source: "issue.comment", }, }); expect(rootCommentWake).not.toBeNull(); const rootRun = await db .select({ contextSnapshot: heartbeatRuns.contextSnapshot }) .from(heartbeatRuns) .where(eq(heartbeatRuns.id, rootCommentWake!.id)) .then((rows) => rows[0] ?? null); expect(rootRun?.contextSnapshot).toMatchObject({ treeHoldInteraction: true, activeTreeHold: { rootIssueId, mode: "pause", interaction: true, }, }); }); });