import { randomUUID } from "node:crypto"; import { and, eq, sql } from "drizzle-orm"; import { afterAll, afterEach, beforeAll, describe, expect, it, vi } from "vitest"; import { activityLog, agents, agentRuntimeState, agentWakeupRequests, companySkills, companies, createDb, documentRevisions, documents, heartbeatRunEvents, heartbeatRuns, issueComments, issueDocuments, issueRelations, issues, } from "@paperclipai/db"; import { getEmbeddedPostgresTestSupport, startEmbeddedPostgresTestDatabase, } from "./helpers/embedded-postgres.js"; import { heartbeatService } from "../services/heartbeat.ts"; import { runningProcesses } from "../adapters/index.ts"; const mockAdapterExecute = vi.hoisted(() => vi.fn(async () => ({ exitCode: 0, signal: null, timedOut: false, errorMessage: null, summary: "Dependency-aware heartbeat test run.", provider: "test", model: "test-model", })), ); vi.mock("../adapters/index.ts", async () => { const actual = await vi.importActual("../adapters/index.ts"); return { ...actual, getServerAdapter: vi.fn(() => ({ supportsLocalAgentJwt: false, execute: mockAdapterExecute, })), }; }); const embeddedPostgresSupport = await getEmbeddedPostgresTestSupport(); const describeEmbeddedPostgres = embeddedPostgresSupport.supported ? describe : describe.skip; if (!embeddedPostgresSupport.supported) { console.warn( `Skipping embedded Postgres heartbeat dependency scheduling tests on this host: ${embeddedPostgresSupport.reason ?? "unsupported environment"}`, ); } async function ensureIssueRelationsTable(db: ReturnType) { await db.execute(sql.raw(` CREATE TABLE IF NOT EXISTS "issue_relations" ( "id" uuid PRIMARY KEY DEFAULT gen_random_uuid(), "company_id" uuid NOT NULL, "issue_id" uuid NOT NULL, "related_issue_id" uuid NOT NULL, "type" text NOT NULL, "created_by_agent_id" uuid, "created_by_user_id" text, "created_at" timestamptz NOT NULL DEFAULT now(), "updated_at" timestamptz NOT NULL DEFAULT now() ); `)); } async function waitForCondition(fn: () => Promise, timeoutMs = 3_000) { const deadline = Date.now() + timeoutMs; while (Date.now() < deadline) { if (await fn()) return true; await new Promise((resolve) => setTimeout(resolve, 50)); } return fn(); } describeEmbeddedPostgres("heartbeat dependency-aware queued run selection", () => { let db!: ReturnType; let heartbeat!: ReturnType; let tempDb: Awaited> | null = null; beforeAll(async () => { tempDb = await startEmbeddedPostgresTestDatabase("paperclip-heartbeat-dependency-scheduling-"); db = createDb(tempDb.connectionString); heartbeat = heartbeatService(db); await ensureIssueRelationsTable(db); }, 20_000); afterEach(async () => { vi.clearAllMocks(); runningProcesses.clear(); let idlePolls = 0; for (let attempt = 0; attempt < 100; attempt += 1) { const runs = await db .select({ status: heartbeatRuns.status }) .from(heartbeatRuns); const hasActiveRun = runs.some((run) => run.status === "queued" || run.status === "running"); if (!hasActiveRun) { idlePolls += 1; if (idlePolls >= 3) break; } else { idlePolls = 0; } await new Promise((resolve) => setTimeout(resolve, 50)); } await new Promise((resolve) => setTimeout(resolve, 50)); await db.delete(activityLog); await db.delete(companySkills); await db.delete(issueComments); await db.delete(issueDocuments); await db.delete(documentRevisions); await db.delete(documents); await db.delete(issueRelations); await db.delete(issues); await db.delete(heartbeatRunEvents); await db.delete(heartbeatRuns); await db.delete(agentWakeupRequests); await db.delete(agentRuntimeState); await db.delete(agents); await db.delete(companies); }); afterAll(async () => { await tempDb?.cleanup(); }); it("keeps blocked descendants queued until their blockers resolve", async () => { const companyId = randomUUID(); const agentId = randomUUID(); const blockerId = randomUUID(); const blockedIssueId = randomUUID(); const readyIssueId = randomUUID(); await db.insert(companies).values({ id: companyId, name: "Paperclip", issuePrefix: `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`, requireBoardApprovalForNewAgents: false, }); await db.insert(agents).values({ id: agentId, companyId, name: "CodexCoder", role: "engineer", status: "active", adapterType: "codex_local", adapterConfig: {}, runtimeConfig: { heartbeat: { wakeOnDemand: true, maxConcurrentRuns: 1, }, }, permissions: {}, }); await db.insert(issues).values([ { id: blockerId, companyId, title: "Mission 0", status: "todo", priority: "high", }, { id: blockedIssueId, companyId, title: "Mission 2", status: "todo", priority: "medium", assigneeAgentId: agentId, }, { id: readyIssueId, companyId, title: "Mission 1", status: "todo", priority: "critical", assigneeAgentId: agentId, }, ]); await db.insert(issueRelations).values({ companyId, issueId: blockerId, relatedIssueId: blockedIssueId, type: "blocks", }); const blockedWake = await heartbeat.wakeup(agentId, { source: "assignment", triggerDetail: "system", reason: "issue_assigned", payload: { issueId: blockedIssueId }, contextSnapshot: { issueId: blockedIssueId, wakeReason: "issue_assigned" }, }); expect(blockedWake).not.toBeNull(); await waitForCondition(async () => { const run = await db .select({ status: heartbeatRuns.status }) .from(heartbeatRuns) .where(eq(heartbeatRuns.id, blockedWake!.id)) .then((rows) => rows[0] ?? null); return run?.status === "queued"; }); const readyWake = await heartbeat.wakeup(agentId, { source: "assignment", triggerDetail: "system", reason: "issue_assigned", payload: { issueId: readyIssueId }, contextSnapshot: { issueId: readyIssueId, wakeReason: "issue_assigned" }, }); expect(readyWake).not.toBeNull(); await waitForCondition(async () => { const run = await db .select({ status: heartbeatRuns.status }) .from(heartbeatRuns) .where(eq(heartbeatRuns.id, readyWake!.id)) .then((rows) => rows[0] ?? null); return run?.status === "succeeded"; }); const [blockedRun, readyRun] = await Promise.all([ db.select().from(heartbeatRuns).where(eq(heartbeatRuns.id, blockedWake!.id)).then((rows) => rows[0] ?? null), db.select().from(heartbeatRuns).where(eq(heartbeatRuns.id, readyWake!.id)).then((rows) => rows[0] ?? null), ]); expect(blockedRun?.status).toBe("queued"); expect(readyRun?.status).toBe("succeeded"); await db .update(issues) .set({ status: "done", updatedAt: new Date() }) .where(eq(issues.id, blockerId)); await heartbeat.wakeup(agentId, { source: "automation", triggerDetail: "system", reason: "issue_blockers_resolved", payload: { issueId: blockedIssueId, resolvedBlockerIssueId: blockerId }, contextSnapshot: { issueId: blockedIssueId, wakeReason: "issue_blockers_resolved", resolvedBlockerIssueId: blockerId, }, }); await waitForCondition(async () => { const run = await db .select({ status: heartbeatRuns.status }) .from(heartbeatRuns) .where(eq(heartbeatRuns.id, blockedWake!.id)) .then((rows) => rows[0] ?? null); return run?.status === "succeeded"; }); const promotedBlockedRun = await db .select({ id: heartbeatRuns.id, status: heartbeatRuns.status, }) .from(heartbeatRuns) .where(eq(heartbeatRuns.id, blockedWake!.id)) .then((rows) => rows[0] ?? null); const blockedWakeRequestCount = await db .select({ count: sql`count(*)::int` }) .from(agentWakeupRequests) .where( and( eq(agentWakeupRequests.agentId, agentId), sql`${agentWakeupRequests.payload} ->> 'issueId' = ${blockedIssueId}`, ), ) .then((rows) => rows[0]?.count ?? 0); expect(promotedBlockedRun?.status).toBe("succeeded"); expect(blockedWakeRequestCount).toBeGreaterThanOrEqual(2); }); });