Cancel stale queued heartbeats when issue graph changes (PAP-2314) (#4534)

Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
Dotta 2026-04-26 21:17:38 -05:00 committed by GitHub
parent 868d08903e
commit 82e257c7ba
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
21 changed files with 1991 additions and 238 deletions

View file

@ -77,6 +77,7 @@ import {
sanitizeRuntimeServiceBaseEnv,
} from "./workspace-runtime.js";
import { issueService } from "./issues.js";
import { parseIssueExecutionState } from "./issue-execution-policy.js";
import {
ISSUE_TREE_CONTROL_INTERACTION_WAKE_REASONS,
isVerifiedIssueTreeControlInteractionWake,
@ -3792,6 +3793,16 @@ export function heartbeatService(db: Db, options: HeartbeatServiceOptions = {})
logger.info({ runId: run.id, issueId, unresolvedBlockerCount }, "claimQueuedRun: cancelled blocked queued run");
return null;
}
const staleness = await evaluateQueuedRunStaleness(run, issueId, context);
if (staleness.stale) {
await cancelQueuedRunForStaleIssue(run, issueId, staleness);
logger.info(
{ runId: run.id, issueId, errorCode: staleness.errorCode },
"claimQueuedRun: cancelled stale queued run",
);
return null;
}
}
const claimedAt = new Date();
@ -3912,6 +3923,151 @@ export function heartbeatService(db: Db, options: HeartbeatServiceOptions = {})
return cancelled;
}
type QueuedRunStaleness =
| { stale: false }
| {
stale: true;
reason: string;
errorCode:
| "issue_not_found"
| "issue_assignee_changed"
| "issue_terminal_status"
| "issue_review_participant_changed";
details: Record<string, unknown>;
};
async function evaluateQueuedRunStaleness(
run: typeof heartbeatRuns.$inferSelect,
issueId: string,
context: Record<string, unknown>,
): Promise<QueuedRunStaleness> {
const issue = await db
.select({
id: issues.id,
status: issues.status,
assigneeAgentId: issues.assigneeAgentId,
executionState: issues.executionState,
})
.from(issues)
.where(and(eq(issues.id, issueId), eq(issues.companyId, run.companyId)))
.then((rows) => rows[0] ?? null);
if (!issue) {
return {
stale: true,
errorCode: "issue_not_found",
reason: "Cancelled because the target issue no longer exists",
details: { issueId },
};
}
const wakeCommentId = deriveCommentId(context, null);
const isInteractionWake = allowsIssueInteractionWake(context);
const resumeIntent = context.resumeIntent === true || context.followUpRequested === true;
if (issue.assigneeAgentId !== run.agentId && !isInteractionWake) {
return {
stale: true,
errorCode: "issue_assignee_changed",
reason:
"Cancelled because issue assignee changed before the queued run could start; the new owner will be woken instead",
details: {
issueId,
previousAssigneeAgentId: run.agentId,
currentAssigneeAgentId: issue.assigneeAgentId,
},
};
}
if (issue.status === "done" || issue.status === "cancelled") {
if (!resumeIntent && !wakeCommentId) {
return {
stale: true,
errorCode: "issue_terminal_status",
reason: `Cancelled because issue reached terminal status (${issue.status}) before the queued run could start`,
details: { issueId, currentStatus: issue.status },
};
}
}
if (issue.status === "in_review") {
const executionState = parseIssueExecutionState(issue.executionState);
const currentParticipant = executionState?.currentParticipant ?? null;
if (currentParticipant) {
const participantMatches =
currentParticipant.type === "agent" && currentParticipant.agentId === run.agentId;
if (!participantMatches && !wakeCommentId) {
return {
stale: true,
errorCode: "issue_review_participant_changed",
reason:
"Cancelled because the in-review participant changed before the queued run could start; the current participant will be woken instead",
details: {
issueId,
currentStageType: executionState?.currentStageType ?? null,
currentParticipant,
},
};
}
}
}
return { stale: false };
}
async function cancelQueuedRunForStaleIssue(
run: typeof heartbeatRuns.$inferSelect,
issueId: string,
staleness: Extract<QueuedRunStaleness, { stale: true }>,
) {
const now = new Date();
const cancelled = await setRunStatus(run.id, "cancelled", {
finishedAt: now,
error: staleness.reason,
errorCode: staleness.errorCode,
resultJson: {
...parseObject(run.resultJson),
stopReason: staleness.errorCode,
effectiveTimeoutSec: 0,
timeoutConfigured: false,
timeoutSource: "stale_queued_run_gate",
timeoutFired: false,
},
});
if (!cancelled) return null;
await setWakeupStatus(run.wakeupRequestId, "skipped", {
finishedAt: now,
error: staleness.reason,
});
await db
.update(issues)
.set({
executionRunId: null,
executionAgentNameKey: null,
executionLockedAt: null,
updatedAt: now,
})
.where(
and(
eq(issues.companyId, run.companyId),
eq(issues.id, issueId),
eq(issues.executionRunId, run.id),
),
);
await appendRunEvent(cancelled, await nextRunEventSeq(cancelled.id), {
eventType: "lifecycle",
stream: "system",
level: "warn",
message: staleness.reason,
payload: staleness.details,
});
return cancelled;
}
async function finalizeAgentStatus(
agentId: string,
outcome: "succeeded" | "failed" | "cancelled" | "timed_out",