mirror of
https://github.com/alkimake/paperclip.git
synced 2026-06-14 01:50:39 +09:00
[codex] Make heartbeat scheduling blocker-aware (#4157)
## Thinking Path > - Paperclip orchestrates AI agents through issue-driven heartbeats, checkouts, and wake scheduling. > - This change sits in the server heartbeat and issue services that decide which queued runs are allowed to start. > - Before this branch, queued heartbeats could be selected even when their issue still had unresolved blocker relationships. > - That let blocked descendant work compete with actually-ready work and risked auto-checking out issues that were not dependency-ready. > - This pull request teaches the scheduler and checkout path to consult issue dependency readiness before claiming queued runs. > - It also exposes dependency readiness in the agent inbox so agents can see which assigned issues are still blocked. > - The result is that heartbeat execution follows the DAG of blocked dependencies instead of waking work out of order. ## What Changed - Added `IssueDependencyReadiness` helpers to `issueService`, including unresolved blocker lookup for single issues and bulk issue lists. - Prevented issue checkout and `in_progress` transitions when unresolved blockers still exist. - Made heartbeat queued-run claiming and prioritization dependency-aware so ready work starts before blocked descendants. - Included dependency readiness fields in `/api/agents/me/inbox-lite` for agent heartbeat selection. - Added regression coverage for dependency-aware heartbeat promotion and issue-service participation filtering. ## Verification - `pnpm run preflight:workspace-links` - `pnpm exec vitest run server/src/__tests__/heartbeat-dependency-scheduling.test.ts server/src/__tests__/issues-service.test.ts` - On this host, the Vitest command passed, but the embedded-Postgres portions of those files were skipped because `@embedded-postgres/darwin-x64` is not installed. ## Risks - Scheduler ordering now prefers dependency-ready runs, so any hidden assumptions about strict FIFO ordering could surface in edge cases. - The new guardrails reject checkout or `in_progress` transitions for blocked issues; callers depending on the old permissive behavior would now get `422` errors. - Local verification did not execute the embedded-Postgres integration paths on this macOS host because the platform binary package was missing. > I checked `ROADMAP.md`; this is a targeted execution/scheduling fix and does not duplicate planned roadmap feature work. ## Model Used - OpenAI Codex via the Paperclip `codex_local` adapter in this workspace. Exact backend model ID is not surfaced in the runtime here; tool-enabled coding agent with terminal execution and repository editing capabilities. ## Checklist - [x] I have included a thinking path that traces from project context to this change - [x] I have specified the model used (with version and capability details) - [x] I have checked ROADMAP.md and confirmed this PR does not duplicate planned core work - [x] I have run tests locally and they pass - [x] I have added or updated tests where applicable - [x] If this change affects the UI, I have included before/after screenshots - [x] I have updated relevant documentation to reflect my changes - [x] I have considered and documented any risks above - [x] I will address all Greptile and reviewer comments before requesting merge
This commit is contained in:
parent
1bf2424377
commit
1266954a4e
5 changed files with 581 additions and 4 deletions
288
server/src/__tests__/heartbeat-dependency-scheduling.test.ts
Normal file
288
server/src/__tests__/heartbeat-dependency-scheduling.test.ts
Normal file
|
|
@ -0,0 +1,288 @@
|
||||||
|
import { randomUUID } from "node:crypto";
|
||||||
|
import { and, eq, sql } from "drizzle-orm";
|
||||||
|
import { afterAll, afterEach, beforeAll, describe, expect, it, vi } from "vitest";
|
||||||
|
import {
|
||||||
|
activityLog,
|
||||||
|
agents,
|
||||||
|
agentRuntimeState,
|
||||||
|
agentWakeupRequests,
|
||||||
|
companySkills,
|
||||||
|
companies,
|
||||||
|
createDb,
|
||||||
|
documentRevisions,
|
||||||
|
documents,
|
||||||
|
heartbeatRunEvents,
|
||||||
|
heartbeatRuns,
|
||||||
|
issueComments,
|
||||||
|
issueDocuments,
|
||||||
|
issueRelations,
|
||||||
|
issues,
|
||||||
|
} from "@paperclipai/db";
|
||||||
|
import {
|
||||||
|
getEmbeddedPostgresTestSupport,
|
||||||
|
startEmbeddedPostgresTestDatabase,
|
||||||
|
} from "./helpers/embedded-postgres.js";
|
||||||
|
import { heartbeatService } from "../services/heartbeat.ts";
|
||||||
|
import { runningProcesses } from "../adapters/index.ts";
|
||||||
|
|
||||||
|
const mockAdapterExecute = vi.hoisted(() =>
|
||||||
|
vi.fn(async () => ({
|
||||||
|
exitCode: 0,
|
||||||
|
signal: null,
|
||||||
|
timedOut: false,
|
||||||
|
errorMessage: null,
|
||||||
|
summary: "Dependency-aware heartbeat test run.",
|
||||||
|
provider: "test",
|
||||||
|
model: "test-model",
|
||||||
|
})),
|
||||||
|
);
|
||||||
|
|
||||||
|
vi.mock("../adapters/index.ts", async () => {
|
||||||
|
const actual = await vi.importActual<typeof import("../adapters/index.ts")>("../adapters/index.ts");
|
||||||
|
return {
|
||||||
|
...actual,
|
||||||
|
getServerAdapter: vi.fn(() => ({
|
||||||
|
supportsLocalAgentJwt: false,
|
||||||
|
execute: mockAdapterExecute,
|
||||||
|
})),
|
||||||
|
};
|
||||||
|
});
|
||||||
|
|
||||||
|
const embeddedPostgresSupport = await getEmbeddedPostgresTestSupport();
|
||||||
|
const describeEmbeddedPostgres = embeddedPostgresSupport.supported ? describe : describe.skip;
|
||||||
|
|
||||||
|
if (!embeddedPostgresSupport.supported) {
|
||||||
|
console.warn(
|
||||||
|
`Skipping embedded Postgres heartbeat dependency scheduling tests on this host: ${embeddedPostgresSupport.reason ?? "unsupported environment"}`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
async function ensureIssueRelationsTable(db: ReturnType<typeof createDb>) {
|
||||||
|
await db.execute(sql.raw(`
|
||||||
|
CREATE TABLE IF NOT EXISTS "issue_relations" (
|
||||||
|
"id" uuid PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||||
|
"company_id" uuid NOT NULL,
|
||||||
|
"issue_id" uuid NOT NULL,
|
||||||
|
"related_issue_id" uuid NOT NULL,
|
||||||
|
"type" text NOT NULL,
|
||||||
|
"created_by_agent_id" uuid,
|
||||||
|
"created_by_user_id" text,
|
||||||
|
"created_at" timestamptz NOT NULL DEFAULT now(),
|
||||||
|
"updated_at" timestamptz NOT NULL DEFAULT now()
|
||||||
|
);
|
||||||
|
`));
|
||||||
|
}
|
||||||
|
|
||||||
|
async function waitForCondition(fn: () => Promise<boolean>, timeoutMs = 3_000) {
|
||||||
|
const deadline = Date.now() + timeoutMs;
|
||||||
|
while (Date.now() < deadline) {
|
||||||
|
if (await fn()) return true;
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||||
|
}
|
||||||
|
return fn();
|
||||||
|
}
|
||||||
|
|
||||||
|
describeEmbeddedPostgres("heartbeat dependency-aware queued run selection", () => {
|
||||||
|
let db!: ReturnType<typeof createDb>;
|
||||||
|
let heartbeat!: ReturnType<typeof heartbeatService>;
|
||||||
|
let tempDb: Awaited<ReturnType<typeof startEmbeddedPostgresTestDatabase>> | null = null;
|
||||||
|
|
||||||
|
beforeAll(async () => {
|
||||||
|
tempDb = await startEmbeddedPostgresTestDatabase("paperclip-heartbeat-dependency-scheduling-");
|
||||||
|
db = createDb(tempDb.connectionString);
|
||||||
|
heartbeat = heartbeatService(db);
|
||||||
|
await ensureIssueRelationsTable(db);
|
||||||
|
}, 20_000);
|
||||||
|
|
||||||
|
afterEach(async () => {
|
||||||
|
vi.clearAllMocks();
|
||||||
|
runningProcesses.clear();
|
||||||
|
let idlePolls = 0;
|
||||||
|
for (let attempt = 0; attempt < 100; attempt += 1) {
|
||||||
|
const runs = await db
|
||||||
|
.select({ status: heartbeatRuns.status })
|
||||||
|
.from(heartbeatRuns);
|
||||||
|
const hasActiveRun = runs.some((run) => run.status === "queued" || run.status === "running");
|
||||||
|
if (!hasActiveRun) {
|
||||||
|
idlePolls += 1;
|
||||||
|
if (idlePolls >= 3) break;
|
||||||
|
} else {
|
||||||
|
idlePolls = 0;
|
||||||
|
}
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||||
|
}
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||||
|
await db.delete(activityLog);
|
||||||
|
await db.delete(companySkills);
|
||||||
|
await db.delete(issueComments);
|
||||||
|
await db.delete(issueDocuments);
|
||||||
|
await db.delete(documentRevisions);
|
||||||
|
await db.delete(documents);
|
||||||
|
await db.delete(issueRelations);
|
||||||
|
await db.delete(issues);
|
||||||
|
await db.delete(heartbeatRunEvents);
|
||||||
|
await db.delete(heartbeatRuns);
|
||||||
|
await db.delete(agentWakeupRequests);
|
||||||
|
await db.delete(agentRuntimeState);
|
||||||
|
await db.delete(agents);
|
||||||
|
await db.delete(companies);
|
||||||
|
});
|
||||||
|
|
||||||
|
afterAll(async () => {
|
||||||
|
await tempDb?.cleanup();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("keeps blocked descendants queued until their blockers resolve", async () => {
|
||||||
|
const companyId = randomUUID();
|
||||||
|
const agentId = randomUUID();
|
||||||
|
const blockerId = randomUUID();
|
||||||
|
const blockedIssueId = randomUUID();
|
||||||
|
const readyIssueId = randomUUID();
|
||||||
|
|
||||||
|
await db.insert(companies).values({
|
||||||
|
id: companyId,
|
||||||
|
name: "Paperclip",
|
||||||
|
issuePrefix: `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`,
|
||||||
|
requireBoardApprovalForNewAgents: false,
|
||||||
|
});
|
||||||
|
await db.insert(agents).values({
|
||||||
|
id: agentId,
|
||||||
|
companyId,
|
||||||
|
name: "CodexCoder",
|
||||||
|
role: "engineer",
|
||||||
|
status: "active",
|
||||||
|
adapterType: "codex_local",
|
||||||
|
adapterConfig: {},
|
||||||
|
runtimeConfig: {
|
||||||
|
heartbeat: {
|
||||||
|
wakeOnDemand: true,
|
||||||
|
maxConcurrentRuns: 1,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
permissions: {},
|
||||||
|
});
|
||||||
|
await db.insert(issues).values([
|
||||||
|
{
|
||||||
|
id: blockerId,
|
||||||
|
companyId,
|
||||||
|
title: "Mission 0",
|
||||||
|
status: "todo",
|
||||||
|
priority: "high",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
id: blockedIssueId,
|
||||||
|
companyId,
|
||||||
|
title: "Mission 2",
|
||||||
|
status: "todo",
|
||||||
|
priority: "medium",
|
||||||
|
assigneeAgentId: agentId,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
id: readyIssueId,
|
||||||
|
companyId,
|
||||||
|
title: "Mission 1",
|
||||||
|
status: "todo",
|
||||||
|
priority: "critical",
|
||||||
|
assigneeAgentId: agentId,
|
||||||
|
},
|
||||||
|
]);
|
||||||
|
await db.insert(issueRelations).values({
|
||||||
|
companyId,
|
||||||
|
issueId: blockerId,
|
||||||
|
relatedIssueId: blockedIssueId,
|
||||||
|
type: "blocks",
|
||||||
|
});
|
||||||
|
|
||||||
|
const blockedWake = await heartbeat.wakeup(agentId, {
|
||||||
|
source: "assignment",
|
||||||
|
triggerDetail: "system",
|
||||||
|
reason: "issue_assigned",
|
||||||
|
payload: { issueId: blockedIssueId },
|
||||||
|
contextSnapshot: { issueId: blockedIssueId, wakeReason: "issue_assigned" },
|
||||||
|
});
|
||||||
|
expect(blockedWake).not.toBeNull();
|
||||||
|
|
||||||
|
await waitForCondition(async () => {
|
||||||
|
const run = await db
|
||||||
|
.select({ status: heartbeatRuns.status })
|
||||||
|
.from(heartbeatRuns)
|
||||||
|
.where(eq(heartbeatRuns.id, blockedWake!.id))
|
||||||
|
.then((rows) => rows[0] ?? null);
|
||||||
|
return run?.status === "queued";
|
||||||
|
});
|
||||||
|
|
||||||
|
const readyWake = await heartbeat.wakeup(agentId, {
|
||||||
|
source: "assignment",
|
||||||
|
triggerDetail: "system",
|
||||||
|
reason: "issue_assigned",
|
||||||
|
payload: { issueId: readyIssueId },
|
||||||
|
contextSnapshot: { issueId: readyIssueId, wakeReason: "issue_assigned" },
|
||||||
|
});
|
||||||
|
expect(readyWake).not.toBeNull();
|
||||||
|
|
||||||
|
await waitForCondition(async () => {
|
||||||
|
const run = await db
|
||||||
|
.select({ status: heartbeatRuns.status })
|
||||||
|
.from(heartbeatRuns)
|
||||||
|
.where(eq(heartbeatRuns.id, readyWake!.id))
|
||||||
|
.then((rows) => rows[0] ?? null);
|
||||||
|
return run?.status === "succeeded";
|
||||||
|
});
|
||||||
|
|
||||||
|
const [blockedRun, readyRun] = await Promise.all([
|
||||||
|
db.select().from(heartbeatRuns).where(eq(heartbeatRuns.id, blockedWake!.id)).then((rows) => rows[0] ?? null),
|
||||||
|
db.select().from(heartbeatRuns).where(eq(heartbeatRuns.id, readyWake!.id)).then((rows) => rows[0] ?? null),
|
||||||
|
]);
|
||||||
|
|
||||||
|
expect(blockedRun?.status).toBe("queued");
|
||||||
|
expect(readyRun?.status).toBe("succeeded");
|
||||||
|
|
||||||
|
await db
|
||||||
|
.update(issues)
|
||||||
|
.set({ status: "done", updatedAt: new Date() })
|
||||||
|
.where(eq(issues.id, blockerId));
|
||||||
|
|
||||||
|
await heartbeat.wakeup(agentId, {
|
||||||
|
source: "automation",
|
||||||
|
triggerDetail: "system",
|
||||||
|
reason: "issue_blockers_resolved",
|
||||||
|
payload: { issueId: blockedIssueId, resolvedBlockerIssueId: blockerId },
|
||||||
|
contextSnapshot: {
|
||||||
|
issueId: blockedIssueId,
|
||||||
|
wakeReason: "issue_blockers_resolved",
|
||||||
|
resolvedBlockerIssueId: blockerId,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
await waitForCondition(async () => {
|
||||||
|
const run = await db
|
||||||
|
.select({ status: heartbeatRuns.status })
|
||||||
|
.from(heartbeatRuns)
|
||||||
|
.where(eq(heartbeatRuns.id, blockedWake!.id))
|
||||||
|
.then((rows) => rows[0] ?? null);
|
||||||
|
return run?.status === "succeeded";
|
||||||
|
});
|
||||||
|
|
||||||
|
const promotedBlockedRun = await db
|
||||||
|
.select({
|
||||||
|
id: heartbeatRuns.id,
|
||||||
|
status: heartbeatRuns.status,
|
||||||
|
})
|
||||||
|
.from(heartbeatRuns)
|
||||||
|
.where(eq(heartbeatRuns.id, blockedWake!.id))
|
||||||
|
.then((rows) => rows[0] ?? null);
|
||||||
|
const blockedWakeRequestCount = await db
|
||||||
|
.select({ count: sql<number>`count(*)::int` })
|
||||||
|
.from(agentWakeupRequests)
|
||||||
|
.where(
|
||||||
|
and(
|
||||||
|
eq(agentWakeupRequests.agentId, agentId),
|
||||||
|
sql`${agentWakeupRequests.payload} ->> 'issueId' = ${blockedIssueId}`,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
.then((rows) => rows[0]?.count ?? 0);
|
||||||
|
|
||||||
|
expect(promotedBlockedRun?.status).toBe("succeeded");
|
||||||
|
expect(blockedWakeRequestCount).toBeGreaterThanOrEqual(2);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
@ -1262,6 +1262,89 @@ describeEmbeddedPostgres("issueService blockers and dependency wake readiness",
|
||||||
]);
|
]);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("reports dependency readiness for blocked issue chains", async () => {
|
||||||
|
const companyId = randomUUID();
|
||||||
|
await db.insert(companies).values({
|
||||||
|
id: companyId,
|
||||||
|
name: "Paperclip",
|
||||||
|
issuePrefix: `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`,
|
||||||
|
requireBoardApprovalForNewAgents: false,
|
||||||
|
});
|
||||||
|
|
||||||
|
const blockerId = randomUUID();
|
||||||
|
const blockedId = randomUUID();
|
||||||
|
await db.insert(issues).values([
|
||||||
|
{ id: blockerId, companyId, title: "Blocker", status: "todo", priority: "medium" },
|
||||||
|
{ id: blockedId, companyId, title: "Blocked", status: "todo", priority: "medium" },
|
||||||
|
]);
|
||||||
|
await svc.update(blockedId, { blockedByIssueIds: [blockerId] });
|
||||||
|
|
||||||
|
await expect(svc.getDependencyReadiness(blockedId)).resolves.toMatchObject({
|
||||||
|
issueId: blockedId,
|
||||||
|
blockerIssueIds: [blockerId],
|
||||||
|
unresolvedBlockerIssueIds: [blockerId],
|
||||||
|
unresolvedBlockerCount: 1,
|
||||||
|
allBlockersDone: false,
|
||||||
|
isDependencyReady: false,
|
||||||
|
});
|
||||||
|
|
||||||
|
await svc.update(blockerId, { status: "done" });
|
||||||
|
|
||||||
|
await expect(svc.getDependencyReadiness(blockedId)).resolves.toMatchObject({
|
||||||
|
issueId: blockedId,
|
||||||
|
blockerIssueIds: [blockerId],
|
||||||
|
unresolvedBlockerIssueIds: [],
|
||||||
|
unresolvedBlockerCount: 0,
|
||||||
|
allBlockersDone: true,
|
||||||
|
isDependencyReady: true,
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it("rejects execution when unresolved blockers remain", async () => {
|
||||||
|
const companyId = randomUUID();
|
||||||
|
const assigneeAgentId = randomUUID();
|
||||||
|
await db.insert(companies).values({
|
||||||
|
id: companyId,
|
||||||
|
name: "Paperclip",
|
||||||
|
issuePrefix: `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`,
|
||||||
|
requireBoardApprovalForNewAgents: false,
|
||||||
|
});
|
||||||
|
await db.insert(agents).values({
|
||||||
|
id: assigneeAgentId,
|
||||||
|
companyId,
|
||||||
|
name: "CodexCoder",
|
||||||
|
role: "engineer",
|
||||||
|
status: "active",
|
||||||
|
adapterType: "codex_local",
|
||||||
|
adapterConfig: {},
|
||||||
|
runtimeConfig: {},
|
||||||
|
permissions: {},
|
||||||
|
});
|
||||||
|
|
||||||
|
const blockerId = randomUUID();
|
||||||
|
const blockedId = randomUUID();
|
||||||
|
await db.insert(issues).values([
|
||||||
|
{ id: blockerId, companyId, title: "Blocker", status: "todo", priority: "medium" },
|
||||||
|
{
|
||||||
|
id: blockedId,
|
||||||
|
companyId,
|
||||||
|
title: "Blocked",
|
||||||
|
status: "todo",
|
||||||
|
priority: "medium",
|
||||||
|
assigneeAgentId,
|
||||||
|
},
|
||||||
|
]);
|
||||||
|
await svc.update(blockedId, { blockedByIssueIds: [blockerId] });
|
||||||
|
|
||||||
|
await expect(
|
||||||
|
svc.update(blockedId, { status: "in_progress" }),
|
||||||
|
).rejects.toMatchObject({ status: 422 });
|
||||||
|
|
||||||
|
await expect(
|
||||||
|
svc.checkout(blockedId, assigneeAgentId, ["todo", "blocked"], null),
|
||||||
|
).rejects.toMatchObject({ status: 422 });
|
||||||
|
});
|
||||||
|
|
||||||
it("wakes parents only when all direct children are terminal", async () => {
|
it("wakes parents only when all direct children are terminal", async () => {
|
||||||
const companyId = randomUUID();
|
const companyId = randomUUID();
|
||||||
const assigneeAgentId = randomUUID();
|
const assigneeAgentId = randomUUID();
|
||||||
|
|
|
||||||
|
|
@ -1184,6 +1184,10 @@ export function agentRoutes(db: Db) {
|
||||||
includeRoutineExecutions: true,
|
includeRoutineExecutions: true,
|
||||||
limit: ISSUE_LIST_DEFAULT_LIMIT,
|
limit: ISSUE_LIST_DEFAULT_LIMIT,
|
||||||
});
|
});
|
||||||
|
const dependencyReadiness = await issuesSvc.listDependencyReadiness(
|
||||||
|
req.actor.companyId,
|
||||||
|
rows.map((issue) => issue.id),
|
||||||
|
);
|
||||||
|
|
||||||
res.json(
|
res.json(
|
||||||
rows.map((issue) => ({
|
rows.map((issue) => ({
|
||||||
|
|
@ -1197,6 +1201,9 @@ export function agentRoutes(db: Db) {
|
||||||
parentId: issue.parentId,
|
parentId: issue.parentId,
|
||||||
updatedAt: issue.updatedAt,
|
updatedAt: issue.updatedAt,
|
||||||
activeRun: issue.activeRun,
|
activeRun: issue.activeRun,
|
||||||
|
dependencyReady: dependencyReadiness.get(issue.id)?.isDependencyReady ?? true,
|
||||||
|
unresolvedBlockerCount: dependencyReadiness.get(issue.id)?.unresolvedBlockerCount ?? 0,
|
||||||
|
unresolvedBlockerIssueIds: dependencyReadiness.get(issue.id)?.unresolvedBlockerIssueIds ?? [],
|
||||||
})),
|
})),
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
|
||||||
|
|
@ -1211,9 +1211,11 @@ function shouldAutoCheckoutIssueForWake(input: {
|
||||||
contextSnapshot: Record<string, unknown> | null | undefined;
|
contextSnapshot: Record<string, unknown> | null | undefined;
|
||||||
issueStatus: string | null;
|
issueStatus: string | null;
|
||||||
issueAssigneeAgentId: string | null;
|
issueAssigneeAgentId: string | null;
|
||||||
|
isDependencyReady: boolean;
|
||||||
agentId: string;
|
agentId: string;
|
||||||
}) {
|
}) {
|
||||||
if (input.issueAssigneeAgentId !== input.agentId) return false;
|
if (input.issueAssigneeAgentId !== input.agentId) return false;
|
||||||
|
if (!input.isDependencyReady) return false;
|
||||||
|
|
||||||
const issueStatus = readNonEmptyString(input.issueStatus);
|
const issueStatus = readNonEmptyString(input.issueStatus);
|
||||||
if (
|
if (
|
||||||
|
|
@ -3062,6 +3064,36 @@ export function heartbeatService(db: Db) {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function issueRunPriorityRank(priority: string | null | undefined) {
|
||||||
|
switch (priority) {
|
||||||
|
case "critical":
|
||||||
|
return 0;
|
||||||
|
case "high":
|
||||||
|
return 1;
|
||||||
|
case "medium":
|
||||||
|
return 2;
|
||||||
|
case "low":
|
||||||
|
return 3;
|
||||||
|
default:
|
||||||
|
return 4;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async function listQueuedRunDependencyReadiness(
|
||||||
|
companyId: string,
|
||||||
|
queuedRuns: Array<typeof heartbeatRuns.$inferSelect>,
|
||||||
|
) {
|
||||||
|
const issueIds = [...new Set(
|
||||||
|
queuedRuns
|
||||||
|
.map((run) => readNonEmptyString(parseObject(run.contextSnapshot).issueId))
|
||||||
|
.filter((issueId): issueId is string => Boolean(issueId)),
|
||||||
|
)];
|
||||||
|
if (issueIds.length === 0) {
|
||||||
|
return new Map<string, Awaited<ReturnType<typeof issuesSvc.getDependencyReadiness>>>();
|
||||||
|
}
|
||||||
|
return issuesSvc.listDependencyReadiness(companyId, issueIds);
|
||||||
|
}
|
||||||
|
|
||||||
async function countRunningRunsForAgent(agentId: string) {
|
async function countRunningRunsForAgent(agentId: string) {
|
||||||
const [{ count }] = await db
|
const [{ count }] = await db
|
||||||
.select({ count: sql<number>`count(*)` })
|
.select({ count: sql<number>`count(*)` })
|
||||||
|
|
@ -3092,6 +3124,16 @@ export function heartbeatService(db: Db) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const issueId = readNonEmptyString(context.issueId);
|
||||||
|
if (issueId) {
|
||||||
|
const dependencyReadiness = await issuesSvc.listDependencyReadiness(run.companyId, [issueId]);
|
||||||
|
const unresolvedBlockerCount = dependencyReadiness.get(issueId)?.unresolvedBlockerCount ?? 0;
|
||||||
|
if (unresolvedBlockerCount > 0) {
|
||||||
|
logger.debug({ runId: run.id, issueId, unresolvedBlockerCount }, "claimQueuedRun: skipping blocked run");
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
const claimedAt = new Date();
|
const claimedAt = new Date();
|
||||||
const claimed = await db
|
const claimed = await db
|
||||||
.update(heartbeatRuns)
|
.update(heartbeatRuns)
|
||||||
|
|
@ -3859,12 +3901,49 @@ export function heartbeatService(db: Db) {
|
||||||
.select()
|
.select()
|
||||||
.from(heartbeatRuns)
|
.from(heartbeatRuns)
|
||||||
.where(and(eq(heartbeatRuns.agentId, agentId), eq(heartbeatRuns.status, "queued")))
|
.where(and(eq(heartbeatRuns.agentId, agentId), eq(heartbeatRuns.status, "queued")))
|
||||||
.orderBy(asc(heartbeatRuns.createdAt))
|
.orderBy(asc(heartbeatRuns.createdAt));
|
||||||
.limit(availableSlots);
|
|
||||||
if (queuedRuns.length === 0) return [];
|
if (queuedRuns.length === 0) return [];
|
||||||
|
|
||||||
|
const dependencyReadiness = await listQueuedRunDependencyReadiness(agent.companyId, queuedRuns);
|
||||||
|
const queuedIssueIds = [...new Set(
|
||||||
|
queuedRuns
|
||||||
|
.map((run) => readNonEmptyString(parseObject(run.contextSnapshot).issueId))
|
||||||
|
.filter((issueId): issueId is string => Boolean(issueId)),
|
||||||
|
)];
|
||||||
|
const issueRows = await db
|
||||||
|
.select({
|
||||||
|
id: issues.id,
|
||||||
|
status: issues.status,
|
||||||
|
priority: issues.priority,
|
||||||
|
})
|
||||||
|
.from(issues)
|
||||||
|
.where(
|
||||||
|
queuedIssueIds.length > 0
|
||||||
|
? and(eq(issues.companyId, agent.companyId), inArray(issues.id, queuedIssueIds))
|
||||||
|
: sql`false`,
|
||||||
|
);
|
||||||
|
const issueById = new Map(issueRows.map((row) => [row.id, row]));
|
||||||
|
const prioritizedRuns = [...queuedRuns].sort((left, right) => {
|
||||||
|
const leftIssueId = readNonEmptyString(parseObject(left.contextSnapshot).issueId);
|
||||||
|
const rightIssueId = readNonEmptyString(parseObject(right.contextSnapshot).issueId);
|
||||||
|
const leftReadiness = leftIssueId ? dependencyReadiness.get(leftIssueId) : null;
|
||||||
|
const rightReadiness = rightIssueId ? dependencyReadiness.get(rightIssueId) : null;
|
||||||
|
const leftReady = leftIssueId ? (leftReadiness?.isDependencyReady ?? true) : true;
|
||||||
|
const rightReady = rightIssueId ? (rightReadiness?.isDependencyReady ?? true) : true;
|
||||||
|
const leftIssue = leftIssueId ? issueById.get(leftIssueId) : null;
|
||||||
|
const rightIssue = rightIssueId ? issueById.get(rightIssueId) : null;
|
||||||
|
const leftRank = leftIssueId ? (leftReady ? (leftIssue?.status === "in_progress" ? 0 : 1) : 3) : 2;
|
||||||
|
const rightRank = rightIssueId ? (rightReady ? (rightIssue?.status === "in_progress" ? 0 : 1) : 3) : 2;
|
||||||
|
if (leftRank !== rightRank) return leftRank - rightRank;
|
||||||
|
const leftPriorityRank = issueRunPriorityRank(leftIssue?.priority);
|
||||||
|
const rightPriorityRank = issueRunPriorityRank(rightIssue?.priority);
|
||||||
|
if (leftPriorityRank !== rightPriorityRank) return leftPriorityRank - rightPriorityRank;
|
||||||
|
return left.createdAt.getTime() - right.createdAt.getTime();
|
||||||
|
});
|
||||||
|
|
||||||
const claimedRuns: Array<typeof heartbeatRuns.$inferSelect> = [];
|
const claimedRuns: Array<typeof heartbeatRuns.$inferSelect> = [];
|
||||||
for (const queuedRun of queuedRuns) {
|
for (const queuedRun of prioritizedRuns) {
|
||||||
|
if (claimedRuns.length >= availableSlots) break;
|
||||||
const claimed = await claimQueuedRun(queuedRun);
|
const claimed = await claimQueuedRun(queuedRun);
|
||||||
if (claimed) claimedRuns.push(claimed);
|
if (claimed) claimedRuns.push(claimed);
|
||||||
}
|
}
|
||||||
|
|
@ -3887,7 +3966,7 @@ export function heartbeatService(db: Db) {
|
||||||
if (run.status === "queued") {
|
if (run.status === "queued") {
|
||||||
const claimed = await claimQueuedRun(run);
|
const claimed = await claimQueuedRun(run);
|
||||||
if (!claimed) {
|
if (!claimed) {
|
||||||
// Another worker has already claimed or finalized this run.
|
// claimQueuedRun can also leave the run queued when dependencies are unresolved.
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
run = claimed;
|
run = claimed;
|
||||||
|
|
@ -3918,6 +3997,9 @@ export function heartbeatService(db: Db) {
|
||||||
const sessionCodec = getAdapterSessionCodec(agent.adapterType);
|
const sessionCodec = getAdapterSessionCodec(agent.adapterType);
|
||||||
const issueId = readNonEmptyString(context.issueId);
|
const issueId = readNonEmptyString(context.issueId);
|
||||||
let issueContext = issueId ? await getIssueExecutionContext(agent.companyId, issueId) : null;
|
let issueContext = issueId ? await getIssueExecutionContext(agent.companyId, issueId) : null;
|
||||||
|
const issueDependencyReadiness = issueId
|
||||||
|
? await issuesSvc.listDependencyReadiness(agent.companyId, [issueId]).then((rows) => rows.get(issueId) ?? null)
|
||||||
|
: null;
|
||||||
if (
|
if (
|
||||||
issueId &&
|
issueId &&
|
||||||
issueContext &&
|
issueContext &&
|
||||||
|
|
@ -3925,6 +4007,7 @@ export function heartbeatService(db: Db) {
|
||||||
contextSnapshot: context,
|
contextSnapshot: context,
|
||||||
issueStatus: issueContext.status,
|
issueStatus: issueContext.status,
|
||||||
issueAssigneeAgentId: issueContext.assigneeAgentId,
|
issueAssigneeAgentId: issueContext.assigneeAgentId,
|
||||||
|
isDependencyReady: issueDependencyReadiness?.isDependencyReady ?? true,
|
||||||
agentId: agent.id,
|
agentId: agent.id,
|
||||||
})
|
})
|
||||||
) {
|
) {
|
||||||
|
|
|
||||||
|
|
@ -141,6 +141,14 @@ type IssueRelationSummaryMap = {
|
||||||
blockedBy: IssueRelationIssueSummary[];
|
blockedBy: IssueRelationIssueSummary[];
|
||||||
blocks: IssueRelationIssueSummary[];
|
blocks: IssueRelationIssueSummary[];
|
||||||
};
|
};
|
||||||
|
export type IssueDependencyReadiness = {
|
||||||
|
issueId: string;
|
||||||
|
blockerIssueIds: string[];
|
||||||
|
unresolvedBlockerIssueIds: string[];
|
||||||
|
unresolvedBlockerCount: number;
|
||||||
|
allBlockersDone: boolean;
|
||||||
|
isDependencyReady: boolean;
|
||||||
|
};
|
||||||
export type ChildIssueCompletionSummary = {
|
export type ChildIssueCompletionSummary = {
|
||||||
id: string;
|
id: string;
|
||||||
identifier: string | null;
|
identifier: string | null;
|
||||||
|
|
@ -191,6 +199,83 @@ function appendAcceptanceCriteriaToDescription(description: string | null | unde
|
||||||
return base ? `${base}\n\n${criteriaMarkdown}` : criteriaMarkdown;
|
return base ? `${base}\n\n${criteriaMarkdown}` : criteriaMarkdown;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function createIssueDependencyReadiness(issueId: string): IssueDependencyReadiness {
|
||||||
|
return {
|
||||||
|
issueId,
|
||||||
|
blockerIssueIds: [],
|
||||||
|
unresolvedBlockerIssueIds: [],
|
||||||
|
unresolvedBlockerCount: 0,
|
||||||
|
allBlockersDone: true,
|
||||||
|
isDependencyReady: true,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
async function listIssueDependencyReadinessMap(
|
||||||
|
dbOrTx: Pick<Db, "select">,
|
||||||
|
companyId: string,
|
||||||
|
issueIds: string[],
|
||||||
|
) {
|
||||||
|
const uniqueIssueIds = [...new Set(issueIds.filter(Boolean))];
|
||||||
|
const readinessMap = new Map<string, IssueDependencyReadiness>();
|
||||||
|
for (const issueId of uniqueIssueIds) {
|
||||||
|
readinessMap.set(issueId, createIssueDependencyReadiness(issueId));
|
||||||
|
}
|
||||||
|
if (uniqueIssueIds.length === 0) return readinessMap;
|
||||||
|
|
||||||
|
const blockerRows = await dbOrTx
|
||||||
|
.select({
|
||||||
|
issueId: issueRelations.relatedIssueId,
|
||||||
|
blockerIssueId: issueRelations.issueId,
|
||||||
|
blockerStatus: issues.status,
|
||||||
|
})
|
||||||
|
.from(issueRelations)
|
||||||
|
.innerJoin(issues, eq(issueRelations.issueId, issues.id))
|
||||||
|
.where(
|
||||||
|
and(
|
||||||
|
eq(issueRelations.companyId, companyId),
|
||||||
|
eq(issueRelations.type, "blocks"),
|
||||||
|
inArray(issueRelations.relatedIssueId, uniqueIssueIds),
|
||||||
|
),
|
||||||
|
);
|
||||||
|
|
||||||
|
for (const row of blockerRows) {
|
||||||
|
const current = readinessMap.get(row.issueId) ?? createIssueDependencyReadiness(row.issueId);
|
||||||
|
current.blockerIssueIds.push(row.blockerIssueId);
|
||||||
|
// Only done blockers resolve dependents; cancelled blockers stay unresolved
|
||||||
|
// until an operator removes or replaces the blocker relationship explicitly.
|
||||||
|
if (row.blockerStatus !== "done") {
|
||||||
|
current.unresolvedBlockerIssueIds.push(row.blockerIssueId);
|
||||||
|
current.unresolvedBlockerCount += 1;
|
||||||
|
current.allBlockersDone = false;
|
||||||
|
current.isDependencyReady = false;
|
||||||
|
}
|
||||||
|
readinessMap.set(row.issueId, current);
|
||||||
|
}
|
||||||
|
|
||||||
|
return readinessMap;
|
||||||
|
}
|
||||||
|
|
||||||
|
async function listUnresolvedBlockerIssueIds(
|
||||||
|
dbOrTx: Pick<Db, "select">,
|
||||||
|
companyId: string,
|
||||||
|
blockerIssueIds: string[],
|
||||||
|
) {
|
||||||
|
const uniqueBlockerIssueIds = [...new Set(blockerIssueIds.filter(Boolean))];
|
||||||
|
if (uniqueBlockerIssueIds.length === 0) return [];
|
||||||
|
return dbOrTx
|
||||||
|
.select({ id: issues.id })
|
||||||
|
.from(issues)
|
||||||
|
.where(
|
||||||
|
and(
|
||||||
|
eq(issues.companyId, companyId),
|
||||||
|
inArray(issues.id, uniqueBlockerIssueIds),
|
||||||
|
// Cancelled blockers intentionally remain unresolved until the relation changes.
|
||||||
|
ne(issues.status, "done"),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
.then((rows) => rows.map((row) => row.id));
|
||||||
|
}
|
||||||
|
|
||||||
async function getProjectDefaultGoalId(
|
async function getProjectDefaultGoalId(
|
||||||
db: ProjectGoalReader,
|
db: ProjectGoalReader,
|
||||||
companyId: string,
|
companyId: string,
|
||||||
|
|
@ -1418,6 +1503,21 @@ export function issueService(db: Db) {
|
||||||
return relations.get(issueId) ?? { blockedBy: [], blocks: [] };
|
return relations.get(issueId) ?? { blockedBy: [], blocks: [] };
|
||||||
},
|
},
|
||||||
|
|
||||||
|
getDependencyReadiness: async (issueId: string, dbOrTx: any = db) => {
|
||||||
|
const issue = await dbOrTx
|
||||||
|
.select({ id: issues.id, companyId: issues.companyId })
|
||||||
|
.from(issues)
|
||||||
|
.where(eq(issues.id, issueId))
|
||||||
|
.then((rows: Array<{ id: string; companyId: string }>) => rows[0] ?? null);
|
||||||
|
if (!issue) throw notFound("Issue not found");
|
||||||
|
const readiness = await listIssueDependencyReadinessMap(dbOrTx, issue.companyId, [issueId]);
|
||||||
|
return readiness.get(issueId) ?? createIssueDependencyReadiness(issueId);
|
||||||
|
},
|
||||||
|
|
||||||
|
listDependencyReadiness: async (companyId: string, issueIds: string[], dbOrTx: any = db) => {
|
||||||
|
return listIssueDependencyReadinessMap(dbOrTx, companyId, issueIds);
|
||||||
|
},
|
||||||
|
|
||||||
listWakeableBlockedDependents: async (blockerIssueId: string) => {
|
listWakeableBlockedDependents: async (blockerIssueId: string) => {
|
||||||
const blockerIssue = await db
|
const blockerIssue = await db
|
||||||
.select({ id: issues.id, companyId: issues.companyId })
|
.select({ id: issues.id, companyId: issues.companyId })
|
||||||
|
|
@ -1838,6 +1938,16 @@ export function issueService(db: Db) {
|
||||||
if (patch.status === "in_progress" && !nextAssigneeAgentId && !nextAssigneeUserId) {
|
if (patch.status === "in_progress" && !nextAssigneeAgentId && !nextAssigneeUserId) {
|
||||||
throw unprocessable("in_progress issues require an assignee");
|
throw unprocessable("in_progress issues require an assignee");
|
||||||
}
|
}
|
||||||
|
if (patch.status === "in_progress") {
|
||||||
|
const unresolvedBlockerIssueIds = blockedByIssueIds !== undefined
|
||||||
|
? await listUnresolvedBlockerIssueIds(dbOrTx, existing.companyId, blockedByIssueIds)
|
||||||
|
: (
|
||||||
|
await listIssueDependencyReadinessMap(dbOrTx, existing.companyId, [id])
|
||||||
|
).get(id)?.unresolvedBlockerIssueIds ?? [];
|
||||||
|
if (unresolvedBlockerIssueIds.length > 0) {
|
||||||
|
throw unprocessable("Issue is blocked by unresolved blockers", { unresolvedBlockerIssueIds });
|
||||||
|
}
|
||||||
|
}
|
||||||
if (issueData.assigneeAgentId) {
|
if (issueData.assigneeAgentId) {
|
||||||
await assertAssignableAgent(existing.companyId, issueData.assigneeAgentId);
|
await assertAssignableAgent(existing.companyId, issueData.assigneeAgentId);
|
||||||
}
|
}
|
||||||
|
|
@ -2007,6 +2117,12 @@ export function issueService(db: Db) {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
const dependencyReadiness = await listIssueDependencyReadinessMap(db, issueCompany.companyId, [id]);
|
||||||
|
const unresolvedBlockerIssueIds = dependencyReadiness.get(id)?.unresolvedBlockerIssueIds ?? [];
|
||||||
|
if (unresolvedBlockerIssueIds.length > 0) {
|
||||||
|
throw unprocessable("Issue is blocked by unresolved blockers", { unresolvedBlockerIssueIds });
|
||||||
|
}
|
||||||
|
|
||||||
const sameRunAssigneeCondition = checkoutRunId
|
const sameRunAssigneeCondition = checkoutRunId
|
||||||
? and(
|
? and(
|
||||||
eq(issues.assigneeAgentId, agentId),
|
eq(issues.assigneeAgentId, agentId),
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue