[codex] Detect issue graph liveness deadlocks (#4209)

## Thinking Path

> - Paperclip orchestrates AI agents for zero-human companies.
> - The heartbeat harness is responsible for waking agents, reconciling
issue state, and keeping execution moving.
> - Some dependency graphs can become live-locks when a blocked issue
depends on an unassigned, cancelled, or otherwise uninvokable issue.
> - Review and approval stages can also stall when the recorded
participant can no longer be resolved.
> - This pull request adds issue graph liveness classification plus
heartbeat reconciliation that creates durable escalation work for those
cases.
> - The benefit is that harness-level deadlocks become visible,
assigned, logged, and recoverable instead of silently leaving task
sequences blocked.

## What Changed

- Added an issue graph liveness classifier for blocked dependency and
invalid review participant states.
- Added heartbeat reconciliation that creates one stable escalation
issue per liveness incident, links it as a blocker, comments on the
affected issue, wakes the recommended owner, and logs activity.
- Wired startup and periodic server reconciliation for issue graph
liveness incidents.
- Added focused tests for classifier behavior, heartbeat escalation
creation/deduplication, and queued dependency wake promotion.
- Fixed queued issue wakes so a coalesced wake re-runs queue selection,
allowing dependency-unblocked work to start immediately.

## Verification

- `pnpm exec vitest run
server/src/__tests__/heartbeat-dependency-scheduling.test.ts
server/src/__tests__/issue-liveness.test.ts
server/src/__tests__/heartbeat-issue-liveness-escalation.test.ts`
- Passed locally: `server/src/__tests__/issue-liveness.test.ts` (5
tests)
- Skipped locally: embedded Postgres suites because optional package
`@embedded-postgres/darwin-x64` is not installed on this host
- `pnpm --filter @paperclipai/server typecheck`
- `git diff --check`
- Greptile review loop: ran 3 times as requested; the final
Greptile-reviewed head `0a864eab` had 0 comments and all Greptile
threads were resolved. Later commits are CI/test-stability fixes after
the requested max Greptile pass count.
- GitHub PR checks on head `87493ed4`: `policy`, `verify`, `e2e`, and
`security/snyk (cryppadotta)` all passed.

## Risks

- Moderate operational risk: the reconciler creates escalation issues
automatically, so incorrect classification could create noise. Stable
incident keys and deduplication limit repeated escalation.
- Low schema risk: this uses existing issue, relation, comment, wake,
and activity log tables with no migration.
- No UI screenshots included because this change is server-side harness
behavior only.

> For core feature work, check [`ROADMAP.md`](ROADMAP.md) first and
discuss it in `#dev` before opening the PR. Feature PRs that overlap
with planned core work may need to be redirected — check the roadmap
first. See `CONTRIBUTING.md`.

## Model Used

- OpenAI Codex, GPT-5-based coding agent. Exact runtime model ID and
context window were not exposed in this session. Used tool execution for
git, tests, typecheck, Greptile review handling, and GitHub CLI
operations.

## Checklist

- [x] I have included a thinking path that traces from project context
to this change
- [x] I have specified the model used (with version and capability
details)
- [x] I have checked ROADMAP.md and confirmed this PR does not duplicate
planned core work
- [x] I have run tests locally and they pass
- [x] I have added or updated tests where applicable
- [x] If this change affects the UI, I have included before/after
screenshots
- [x] I have updated relevant documentation to reflect my changes
- [x] I have considered and documented any risks above
- [x] I will address all Greptile and reviewer comments before
requesting merge
This commit is contained in:
Dotta 2026-04-21 09:11:12 -05:00 committed by GitHub
parent 8d0c3d2fe6
commit 1954eb3048
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 1171 additions and 2 deletions

View file

@ -3,7 +3,7 @@ import path from "node:path";
import { execFile as execFileCallback } from "node:child_process";
import { promisify } from "node:util";
import { randomUUID } from "node:crypto";
import { and, asc, desc, eq, getTableColumns, gt, inArray, isNull, or, sql } from "drizzle-orm";
import { and, asc, desc, eq, getTableColumns, gt, inArray, isNull, notInArray, or, sql } from "drizzle-orm";
import type { Db } from "@paperclipai/db";
import {
AGENT_DEFAULT_MAX_CONCURRENT_RUNS,
@ -25,6 +25,7 @@ import {
heartbeatRunEvents,
heartbeatRuns,
issueComments,
issueRelations,
issues,
issueWorkProducts,
projects,
@ -61,6 +62,10 @@ import {
classifyRunLiveness,
type RunLivenessClassificationInput,
} from "./run-liveness.js";
import {
classifyIssueGraphLiveness,
type IssueLivenessFinding,
} from "./issue-liveness.js";
import { logActivity, publishPluginDomainEvent, type LogActivityInput } from "./activity-log.js";
import {
buildWorkspaceReadyComment,
@ -3830,6 +3835,363 @@ export function heartbeatService(db: Db) {
return result;
}
function issueIdFromRunContext(contextSnapshot: unknown) {
const context = parseObject(contextSnapshot);
return readNonEmptyString(context.issueId) ?? readNonEmptyString(context.taskId);
}
function issueIdFromWakePayload(payload: unknown) {
const parsed = parseObject(payload);
const nestedContext = parseObject(parsed[DEFERRED_WAKE_CONTEXT_KEY]);
return readNonEmptyString(parsed.issueId) ??
readNonEmptyString(nestedContext.issueId) ??
readNonEmptyString(nestedContext.taskId);
}
async function collectIssueGraphLivenessFindings() {
const [issueRows, relationRows, agentRows, activeRunRows, wakeRows] = await Promise.all([
db
.select({
id: issues.id,
companyId: issues.companyId,
identifier: issues.identifier,
title: issues.title,
status: issues.status,
projectId: issues.projectId,
goalId: issues.goalId,
parentId: issues.parentId,
assigneeAgentId: issues.assigneeAgentId,
assigneeUserId: issues.assigneeUserId,
createdByAgentId: issues.createdByAgentId,
createdByUserId: issues.createdByUserId,
executionState: issues.executionState,
})
.from(issues)
.where(isNull(issues.hiddenAt)),
db
.select({
companyId: issueRelations.companyId,
blockerIssueId: issueRelations.issueId,
blockedIssueId: issueRelations.relatedIssueId,
})
.from(issueRelations)
.where(eq(issueRelations.type, "blocks")),
db
.select({
id: agents.id,
companyId: agents.companyId,
name: agents.name,
role: agents.role,
title: agents.title,
status: agents.status,
reportsTo: agents.reportsTo,
})
.from(agents),
db
.select({
companyId: heartbeatRuns.companyId,
agentId: heartbeatRuns.agentId,
status: heartbeatRuns.status,
contextSnapshot: heartbeatRuns.contextSnapshot,
})
.from(heartbeatRuns)
.where(inArray(heartbeatRuns.status, [...ACTIVE_HEARTBEAT_RUN_STATUSES])),
db
.select({
companyId: agentWakeupRequests.companyId,
agentId: agentWakeupRequests.agentId,
status: agentWakeupRequests.status,
payload: agentWakeupRequests.payload,
})
.from(agentWakeupRequests)
.where(inArray(agentWakeupRequests.status, ["queued", "deferred_issue_execution"])),
]);
return classifyIssueGraphLiveness({
issues: issueRows,
relations: relationRows,
agents: agentRows,
activeRuns: activeRunRows.map((row) => ({
companyId: row.companyId,
agentId: row.agentId,
status: row.status,
issueId: issueIdFromRunContext(row.contextSnapshot),
})),
queuedWakeRequests: wakeRows.map((row) => ({
companyId: row.companyId,
agentId: row.agentId,
status: row.status,
issueId: issueIdFromWakePayload(row.payload),
})),
});
}
async function findOpenLivenessEscalation(companyId: string, incidentKey: string) {
return db
.select()
.from(issues)
.where(
and(
eq(issues.companyId, companyId),
eq(issues.originKind, "harness_liveness_escalation"),
eq(issues.originId, incidentKey),
isNull(issues.hiddenAt),
notInArray(issues.status, ["done", "cancelled"]),
),
)
.limit(1)
.then((rows) => rows[0] ?? null);
}
async function existingBlockerIssueIds(companyId: string, issueId: string) {
return db
.select({ blockerIssueId: issueRelations.issueId })
.from(issueRelations)
.where(
and(
eq(issueRelations.companyId, companyId),
eq(issueRelations.relatedIssueId, issueId),
eq(issueRelations.type, "blocks"),
),
)
.then((rows) => rows.map((row) => row.blockerIssueId));
}
function formatDependencyPath(finding: IssueLivenessFinding) {
return finding.dependencyPath
.map((entry) => entry.identifier ?? entry.issueId)
.join(" -> ");
}
function buildLivenessEscalationDescription(finding: IssueLivenessFinding) {
return [
"Paperclip detected a harness-level issue graph liveness incident.",
"",
`- Incident key: \`${finding.incidentKey}\``,
`- Finding: \`${finding.state}\``,
`- Dependency path: ${formatDependencyPath(finding)}`,
`- Reason: ${finding.reason}`,
`- Requested action: ${finding.recommendedAction}`,
"",
"Resolve the blocked chain, then mark this escalation issue done so the original issue can resume when all blockers are cleared.",
].join("\n");
}
function buildLivenessOriginalIssueComment(finding: IssueLivenessFinding, escalation: typeof issues.$inferSelect) {
return [
"Paperclip detected a harness-level liveness incident in this issue's dependency graph.",
"",
`- Escalation issue: ${escalation.identifier ?? escalation.id}`,
`- Incident key: \`${finding.incidentKey}\``,
`- Finding: \`${finding.state}\``,
`- Dependency path: ${formatDependencyPath(finding)}`,
`- Reason: ${finding.reason}`,
`- Manager action requested: ${finding.recommendedAction}`,
"",
"This issue now keeps its existing blockers and is also blocked by the escalation issue so dependency wakeups remain explicit.",
].join("\n");
}
async function resolveEscalationOwnerAgentId(
finding: IssueLivenessFinding,
issue: typeof issues.$inferSelect,
) {
const candidates = [
finding.recommendedOwnerAgentId,
...finding.recommendedOwnerCandidateAgentIds,
].filter((candidate): candidate is string => Boolean(candidate));
for (const candidate of [...new Set(candidates)]) {
const budgetBlock = await budgets.getInvocationBlock(issue.companyId, candidate, {
issueId: issue.id,
projectId: issue.projectId,
});
if (!budgetBlock) return candidate;
}
return null;
}
async function ensureIssueBlockedByEscalation(input: {
issue: typeof issues.$inferSelect;
escalationIssueId: string;
finding: IssueLivenessFinding;
runId?: string | null;
}) {
const blockerIds = await existingBlockerIssueIds(input.issue.companyId, input.issue.id);
const nextBlockerIds = [...new Set([...blockerIds, input.escalationIssueId])];
const update: Partial<typeof issues.$inferInsert> & { blockedByIssueIds: string[] } = {
blockedByIssueIds: nextBlockerIds,
};
if (input.issue.status !== "blocked") {
update.status = "blocked";
}
const updated = await issuesSvc.update(input.issue.id, update);
if (!updated) return null;
await logActivity(db, {
companyId: input.issue.companyId,
actorType: "system",
actorId: "system",
agentId: null,
runId: input.runId ?? null,
action: "issue.blockers.updated",
entityType: "issue",
entityId: input.issue.id,
details: {
source: "heartbeat.reconcile_issue_graph_liveness",
incidentKey: input.finding.incidentKey,
findingState: input.finding.state,
blockerIssueIds: nextBlockerIds,
escalationIssueId: input.escalationIssueId,
status: update.status ?? input.issue.status,
previousStatus: input.issue.status,
},
});
return updated;
}
async function createIssueGraphLivenessEscalation(input: {
finding: IssueLivenessFinding;
runId?: string | null;
}) {
const issue = await db
.select()
.from(issues)
.where(eq(issues.id, input.finding.issueId))
.then((rows) => rows[0] ?? null);
if (!issue || issue.companyId !== input.finding.companyId) return { kind: "skipped" as const };
const existing = await findOpenLivenessEscalation(issue.companyId, input.finding.incidentKey);
if (existing) {
await ensureIssueBlockedByEscalation({
issue,
escalationIssueId: existing.id,
finding: input.finding,
runId: input.runId ?? null,
});
return { kind: "existing" as const, escalationIssueId: existing.id };
}
const ownerAgentId = await resolveEscalationOwnerAgentId(input.finding, issue);
if (!ownerAgentId) return { kind: "skipped" as const };
const escalation = await issuesSvc.create(issue.companyId, {
title: `Unblock liveness incident for ${issue.identifier ?? issue.title}`,
description: buildLivenessEscalationDescription(input.finding),
status: "todo",
priority: "high",
parentId: issue.id,
projectId: issue.projectId,
goalId: issue.goalId,
assigneeAgentId: ownerAgentId,
originKind: "harness_liveness_escalation",
originId: input.finding.incidentKey,
billingCode: issue.billingCode,
inheritExecutionWorkspaceFromIssueId: issue.id,
});
await ensureIssueBlockedByEscalation({
issue,
escalationIssueId: escalation.id,
finding: input.finding,
runId: input.runId ?? null,
});
await issuesSvc.addComment(
issue.id,
buildLivenessOriginalIssueComment(input.finding, escalation),
{ runId: input.runId ?? null },
);
await logActivity(db, {
companyId: issue.companyId,
actorType: "system",
actorId: "system",
agentId: ownerAgentId,
runId: input.runId ?? null,
action: "issue.harness_liveness_escalation_created",
entityType: "issue",
entityId: escalation.id,
details: {
source: "heartbeat.reconcile_issue_graph_liveness",
incidentKey: input.finding.incidentKey,
findingState: input.finding.state,
sourceIssueId: issue.id,
sourceIdentifier: issue.identifier,
escalationIssueId: escalation.id,
escalationIdentifier: escalation.identifier,
dependencyPath: input.finding.dependencyPath,
},
});
const wake = await enqueueWakeup(ownerAgentId, {
source: "assignment",
triggerDetail: "system",
reason: "issue_assigned",
payload: {
issueId: escalation.id,
sourceIssueId: issue.id,
incidentKey: input.finding.incidentKey,
},
requestedByActorType: "system",
requestedByActorId: null,
contextSnapshot: {
issueId: escalation.id,
taskId: escalation.id,
wakeReason: "issue_assigned",
source: "harness_liveness_escalation",
sourceIssueId: issue.id,
incidentKey: input.finding.incidentKey,
},
});
logger.warn({
incidentKey: input.finding.incidentKey,
findingState: input.finding.state,
sourceIssueId: issue.id,
escalationIssueId: escalation.id,
ownerAgentId,
wakeupRunId: wake?.id ?? null,
}, "created issue graph liveness escalation");
return { kind: "created" as const, escalationIssueId: escalation.id };
}
async function reconcileIssueGraphLiveness(opts?: { runId?: string | null }) {
const findings = await collectIssueGraphLivenessFindings();
const result = {
findings: findings.length,
escalationsCreated: 0,
existingEscalations: 0,
skipped: 0,
issueIds: [] as string[],
escalationIssueIds: [] as string[],
};
for (const finding of findings) {
const escalation = await createIssueGraphLivenessEscalation({
finding,
runId: opts?.runId ?? null,
});
if (escalation.kind === "created") {
result.escalationsCreated += 1;
result.issueIds.push(finding.issueId);
result.escalationIssueIds.push(escalation.escalationIssueId);
} else if (escalation.kind === "existing") {
result.existingEscalations += 1;
result.issueIds.push(finding.issueId);
result.escalationIssueIds.push(escalation.escalationIssueId);
} else {
result.skipped += 1;
}
}
return result;
}
async function updateRuntimeState(
agent: typeof agents.$inferSelect,
run: typeof heartbeatRuns.$inferSelect,
@ -5590,7 +5952,10 @@ export function heartbeatService(db: Db) {
});
if (outcome.kind === "deferred" || outcome.kind === "skipped") return null;
if (outcome.kind === "coalesced") return outcome.run;
if (outcome.kind === "coalesced") {
await startNextQueuedRunForAgent(agent.id);
return outcome.run;
}
const newRun = outcome.run;
publishLiveEvent({
@ -6164,6 +6529,8 @@ export function heartbeatService(db: Db) {
reconcileStrandedAssignedIssues,
reconcileIssueGraphLiveness,
tickTimers: async (now = new Date()) => {
const allAgents = await db.select().from(agents);
let checked = 0;