From 6916e30f8ed440a0948255c96dc11251a098fb8d Mon Sep 17 00:00:00 2001 From: Dotta <34892728+cryppadotta@users.noreply.github.com> Date: Fri, 24 Apr 2026 19:24:13 -0500 Subject: [PATCH] Cancel stale retries when issue ownership changes (#4445) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Thinking Path > - Paperclip orchestrates AI agents for zero-human companies > - Issue execution is guarded by run locks and bounded retry scheduling > - A failed run can schedule a retry, but the issue may be reassigned before that retry becomes due > - The old assignee's scheduled retry should not continue to hold or reclaim execution for the issue > - This pull request cancels stale scheduled retries when ownership changes and cancels live work when an issue is explicitly cancelled > - The benefit is cleaner issue handoff semantics and fewer stranded or incorrect execution locks ## What Changed - Cancel scheduled retry runs when their issue has been reassigned before the retry is promoted. - Clear stale issue execution locks and cancel the associated wakeup request when a stale retry is cancelled. - Avoid deferring a new assignee behind a previous assignee's scheduled retry. - Cancel an active run when an issue status is explicitly changed to `cancelled`, while leaving `done` transitions alone. - Added route and heartbeat regressions for reassignment and cancellation behavior. ## Verification - `pnpm exec vitest run --project @paperclipai/server server/src/__tests__/heartbeat-retry-scheduling.test.ts server/src/__tests__/issue-comment-reopen-routes.test.ts --pool=forks --poolOptions.forks.isolate=true` - `issue-comment-reopen-routes.test.ts`: 28 passed. - `heartbeat-retry-scheduling.test.ts`: skipped by the existing embedded Postgres host guard (`Postgres init script exited with code null`). - `pnpm --filter @paperclipai/server typecheck` ## Risks - Medium risk because this changes heartbeat retry lifecycle behavior. - The cancellation path is scoped to scheduled retries whose issue assignee no longer matches the retrying agent, and logs a lifecycle event for auditability. - No migrations. > For core feature work, check [`ROADMAP.md`](ROADMAP.md) first and discuss it in `#dev` before opening the PR. Feature PRs that overlap with planned core work may need to be redirected — check the roadmap first. See `CONTRIBUTING.md`. ## Model Used - OpenAI Codex, GPT-5 coding agent, tool-enabled with shell/GitHub/Paperclip API access. Context window was not reported by the runtime. ## Checklist - [x] I have included a thinking path that traces from project context to this change - [x] I have specified the model used (with version and capability details) - [x] I have checked ROADMAP.md and confirmed this PR does not duplicate planned core work - [x] I have run tests locally and they pass - [x] I have added or updated tests where applicable - [x] If this change affects the UI, I have included before/after screenshots - [x] I have updated relevant documentation to reflect my changes - [x] I have considered and documented any risks above - [x] I will address all Greptile and reviewer comments before requesting merge --------- Co-authored-by: Paperclip --- .../heartbeat-retry-scheduling.test.ts | 376 ++++++++++++++++++ .../issue-comment-reopen-routes.test.ts | 67 ++++ server/src/routes/issues.ts | 42 ++ server/src/services/heartbeat.ts | 206 +++++++++- 4 files changed, 676 insertions(+), 15 deletions(-) diff --git a/server/src/__tests__/heartbeat-retry-scheduling.test.ts b/server/src/__tests__/heartbeat-retry-scheduling.test.ts index 162ea539..9999c05e 100644 --- a/server/src/__tests__/heartbeat-retry-scheduling.test.ts +++ b/server/src/__tests__/heartbeat-retry-scheduling.test.ts @@ -3,11 +3,14 @@ import { eq, sql } from "drizzle-orm"; import { afterAll, afterEach, beforeAll, describe, expect, it } from "vitest"; import { agents, + agentRuntimeState, agentWakeupRequests, companies, createDb, + environmentLeases, heartbeatRunEvents, heartbeatRuns, + issues, } from "@paperclipai/db"; import { getEmbeddedPostgresTestSupport, @@ -40,8 +43,11 @@ describeEmbeddedPostgres("heartbeat bounded retry scheduling", () => { afterEach(async () => { await db.delete(heartbeatRunEvents); + await db.delete(environmentLeases); + await db.delete(issues); await db.delete(heartbeatRuns); await db.delete(agentWakeupRequests); + await db.delete(agentRuntimeState); await db.delete(agents); await db.delete(companies); }); @@ -212,6 +218,376 @@ describeEmbeddedPostgres("heartbeat bounded retry scheduling", () => { expect(promotedRun?.status).toBe("queued"); }); + it("does not defer a new assignee behind the previous assignee's scheduled retry", async () => { + const companyId = randomUUID(); + const oldAgentId = randomUUID(); + const newAgentId = randomUUID(); + const issueId = randomUUID(); + const sourceRunId = randomUUID(); + const now = new Date("2026-04-20T13:00:00.000Z"); + + await db.insert(companies).values({ + id: companyId, + name: "Paperclip", + issuePrefix: `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`, + requireBoardApprovalForNewAgents: false, + }); + + await db.insert(agents).values([ + { + id: oldAgentId, + companyId, + name: "ClaudeCoder", + role: "engineer", + status: "active", + adapterType: "claude_local", + adapterConfig: {}, + runtimeConfig: { + heartbeat: { + wakeOnDemand: true, + maxConcurrentRuns: 1, + }, + }, + permissions: {}, + }, + { + id: newAgentId, + companyId, + name: "CodexCoder", + role: "engineer", + status: "active", + adapterType: "codex_local", + adapterConfig: {}, + runtimeConfig: { + heartbeat: { + wakeOnDemand: true, + maxConcurrentRuns: 1, + }, + }, + permissions: {}, + }, + ]); + + await db.insert(heartbeatRuns).values({ + id: sourceRunId, + companyId, + agentId: oldAgentId, + invocationSource: "assignment", + triggerDetail: "system", + status: "failed", + error: "upstream overload", + errorCode: "adapter_failed", + finishedAt: now, + contextSnapshot: { + issueId, + wakeReason: "issue_assigned", + }, + updatedAt: now, + createdAt: now, + }); + + await db.insert(issues).values({ + id: issueId, + companyId, + title: "Retry reassignment", + status: "todo", + priority: "medium", + assigneeAgentId: oldAgentId, + executionRunId: sourceRunId, + executionAgentNameKey: "claudecoder", + executionLockedAt: now, + issueNumber: 1, + identifier: `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}-1`, + }); + + const scheduled = await heartbeat.scheduleBoundedRetry(sourceRunId, { + now, + random: () => 0.5, + }); + expect(scheduled.outcome).toBe("scheduled"); + if (scheduled.outcome !== "scheduled") return; + + await db.update(issues).set({ + assigneeAgentId: newAgentId, + updatedAt: now, + }).where(eq(issues.id, issueId)); + + // Keep the new agent's queue from auto-claiming/executing during this unit test. + await db.insert(heartbeatRuns).values( + Array.from({ length: 5 }, () => ({ + id: randomUUID(), + companyId, + agentId: newAgentId, + invocationSource: "automation", + triggerDetail: "system", + status: "running", + contextSnapshot: { + wakeReason: "test_busy_slot", + }, + startedAt: now, + updatedAt: now, + createdAt: now, + })), + ); + + const newAssigneeRun = await heartbeat.wakeup(newAgentId, { + source: "assignment", + triggerDetail: "system", + reason: "issue_assigned", + payload: { + issueId, + mutation: "update", + }, + contextSnapshot: { + issueId, + source: "issue.update", + }, + requestedByActorType: "user", + requestedByActorId: "local-board", + }); + + expect(newAssigneeRun).not.toBeNull(); + expect(newAssigneeRun?.agentId).toBe(newAgentId); + expect(newAssigneeRun?.status).toBe("queued"); + + const oldRetry = await db + .select({ + status: heartbeatRuns.status, + errorCode: heartbeatRuns.errorCode, + }) + .from(heartbeatRuns) + .where(eq(heartbeatRuns.id, scheduled.run.id)) + .then((rows) => rows[0] ?? null); + expect(oldRetry).toEqual({ + status: "cancelled", + errorCode: "issue_reassigned", + }); + + const deferredWakeups = await db + .select({ count: sql`count(*)::int` }) + .from(agentWakeupRequests) + .where(eq(agentWakeupRequests.status, "deferred_issue_execution")) + .then((rows) => rows[0]?.count ?? 0); + expect(deferredWakeups).toBe(0); + }); + + it("does not promote a scheduled retry after issue ownership changes", async () => { + const companyId = randomUUID(); + const oldAgentId = randomUUID(); + const newAgentId = randomUUID(); + const issueId = randomUUID(); + const sourceRunId = randomUUID(); + const now = new Date("2026-04-20T14:00:00.000Z"); + + await db.insert(companies).values({ + id: companyId, + name: "Paperclip", + issuePrefix: `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`, + requireBoardApprovalForNewAgents: false, + }); + + await db.insert(agents).values([ + { + id: oldAgentId, + companyId, + name: "ClaudeCoder", + role: "engineer", + status: "active", + adapterType: "claude_local", + adapterConfig: {}, + runtimeConfig: { + heartbeat: { + wakeOnDemand: true, + maxConcurrentRuns: 1, + }, + }, + permissions: {}, + }, + { + id: newAgentId, + companyId, + name: "CodexCoder", + role: "engineer", + status: "active", + adapterType: "codex_local", + adapterConfig: {}, + runtimeConfig: { + heartbeat: { + wakeOnDemand: true, + maxConcurrentRuns: 1, + }, + }, + permissions: {}, + }, + ]); + + await db.insert(heartbeatRuns).values({ + id: sourceRunId, + companyId, + agentId: oldAgentId, + invocationSource: "assignment", + triggerDetail: "system", + status: "failed", + error: "upstream overload", + errorCode: "adapter_failed", + finishedAt: now, + contextSnapshot: { + issueId, + wakeReason: "issue_assigned", + }, + updatedAt: now, + createdAt: now, + }); + + await db.insert(issues).values({ + id: issueId, + companyId, + title: "Retry promotion reassignment", + status: "todo", + priority: "medium", + assigneeAgentId: oldAgentId, + executionRunId: sourceRunId, + executionAgentNameKey: "claudecoder", + executionLockedAt: now, + issueNumber: 1, + identifier: `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}-2`, + }); + + const scheduled = await heartbeat.scheduleBoundedRetry(sourceRunId, { + now, + random: () => 0.5, + }); + expect(scheduled.outcome).toBe("scheduled"); + if (scheduled.outcome !== "scheduled") return; + + await db.update(issues).set({ + assigneeAgentId: newAgentId, + updatedAt: now, + }).where(eq(issues.id, issueId)); + + const promotion = await heartbeat.promoteDueScheduledRetries(scheduled.dueAt); + expect(promotion).toEqual({ promoted: 0, runIds: [] }); + + const oldRetry = await db + .select({ + status: heartbeatRuns.status, + errorCode: heartbeatRuns.errorCode, + }) + .from(heartbeatRuns) + .where(eq(heartbeatRuns.id, scheduled.run.id)) + .then((rows) => rows[0] ?? null); + expect(oldRetry).toEqual({ + status: "cancelled", + errorCode: "issue_reassigned", + }); + + const issue = await db + .select({ executionRunId: issues.executionRunId }) + .from(issues) + .where(eq(issues.id, issueId)) + .then((rows) => rows[0] ?? null); + expect(issue?.executionRunId).toBeNull(); + }); + + it("does not promote a scheduled retry after the issue is cancelled", async () => { + const companyId = randomUUID(); + const agentId = randomUUID(); + const issueId = randomUUID(); + const sourceRunId = randomUUID(); + const now = new Date("2026-04-20T15:00:00.000Z"); + + await db.insert(companies).values({ + id: companyId, + name: "Paperclip", + issuePrefix: `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`, + requireBoardApprovalForNewAgents: false, + }); + + await db.insert(agents).values({ + id: agentId, + companyId, + name: "CodexCoder", + role: "engineer", + status: "active", + adapterType: "codex_local", + adapterConfig: {}, + runtimeConfig: { + heartbeat: { + wakeOnDemand: true, + maxConcurrentRuns: 1, + }, + }, + permissions: {}, + }); + + await db.insert(heartbeatRuns).values({ + id: sourceRunId, + companyId, + agentId, + invocationSource: "assignment", + triggerDetail: "system", + status: "failed", + error: "upstream overload", + errorCode: "adapter_failed", + finishedAt: now, + contextSnapshot: { + issueId, + wakeReason: "issue_assigned", + }, + updatedAt: now, + createdAt: now, + }); + + await db.insert(issues).values({ + id: issueId, + companyId, + title: "Retry promotion cancellation", + status: "todo", + priority: "medium", + assigneeAgentId: agentId, + executionRunId: sourceRunId, + executionAgentNameKey: "codexcoder", + executionLockedAt: now, + issueNumber: 1, + identifier: `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}-3`, + }); + + const scheduled = await heartbeat.scheduleBoundedRetry(sourceRunId, { + now, + random: () => 0.5, + }); + expect(scheduled.outcome).toBe("scheduled"); + if (scheduled.outcome !== "scheduled") return; + + await db.update(issues).set({ + status: "cancelled", + updatedAt: now, + }).where(eq(issues.id, issueId)); + + const promotion = await heartbeat.promoteDueScheduledRetries(scheduled.dueAt); + expect(promotion).toEqual({ promoted: 0, runIds: [] }); + + const oldRetry = await db + .select({ + status: heartbeatRuns.status, + errorCode: heartbeatRuns.errorCode, + }) + .from(heartbeatRuns) + .where(eq(heartbeatRuns.id, scheduled.run.id)) + .then((rows) => rows[0] ?? null); + expect(oldRetry).toEqual({ + status: "cancelled", + errorCode: "issue_cancelled", + }); + + const issue = await db + .select({ executionRunId: issues.executionRunId }) + .from(issues) + .where(eq(issues.id, issueId)) + .then((rows) => rows[0] ?? null); + expect(issue?.executionRunId).toBeNull(); + }); + it("exhausts bounded retries after the hard cap", async () => { const companyId = randomUUID(); const agentId = randomUUID(); diff --git a/server/src/__tests__/issue-comment-reopen-routes.test.ts b/server/src/__tests__/issue-comment-reopen-routes.test.ts index 7709274b..3cca0371 100644 --- a/server/src/__tests__/issue-comment-reopen-routes.test.ts +++ b/server/src/__tests__/issue-comment-reopen-routes.test.ts @@ -957,6 +957,73 @@ describe.sequential("issue comment reopen routes", () => { ); }); + it("cancels an active run when an issue is marked cancelled", async () => { + const issue = { + ...makeIssue("in_progress"), + executionRunId: "run-1", + }; + mockIssueService.getById.mockResolvedValue(issue); + mockIssueService.update.mockImplementation(async (_id: string, patch: Record) => ({ + ...issue, + ...patch, + })); + mockHeartbeatService.getRun.mockResolvedValue({ + id: "run-1", + companyId: "company-1", + agentId: "22222222-2222-4222-8222-222222222222", + status: "running", + }); + mockHeartbeatService.cancelRun.mockResolvedValue({ + id: "run-1", + companyId: "company-1", + agentId: "22222222-2222-4222-8222-222222222222", + status: "cancelled", + }); + + const res = await request(await installActor(createApp())) + .patch("/api/issues/11111111-1111-4111-8111-111111111111") + .send({ status: "cancelled" }); + + expect(res.status).toBe(200); + expect(mockHeartbeatService.getRun).toHaveBeenCalledWith("run-1"); + expect(mockHeartbeatService.cancelRun).toHaveBeenCalledWith("run-1"); + expect(mockLogActivity).toHaveBeenCalledWith( + expect.anything(), + expect.objectContaining({ + action: "heartbeat.cancelled", + details: expect.objectContaining({ + source: "issue_status_cancelled", + issueId: "11111111-1111-4111-8111-111111111111", + }), + }), + ); + }); + + it("does not cancel active runs when an issue is marked done", async () => { + const issue = { + ...makeIssue("in_progress"), + executionRunId: "run-1", + }; + mockIssueService.getById.mockResolvedValue(issue); + mockIssueService.update.mockImplementation(async (_id: string, patch: Record) => ({ + ...issue, + ...patch, + })); + mockHeartbeatService.getRun.mockResolvedValue({ + id: "run-1", + companyId: "company-1", + agentId: "22222222-2222-4222-8222-222222222222", + status: "running", + }); + + const res = await request(await installActor(createApp())) + .patch("/api/issues/11111111-1111-4111-8111-111111111111") + .send({ status: "done" }); + + expect(res.status).toBe(200); + expect(mockHeartbeatService.cancelRun).not.toHaveBeenCalled(); + }); + it("writes decision ids into executionState and inserts the decision inside the transaction", async () => { const policy = await normalizePolicy({ stages: [ diff --git a/server/src/routes/issues.ts b/server/src/routes/issues.ts index 15432c1a..4458f382 100644 --- a/server/src/routes/issues.ts +++ b/server/src/routes/issues.ts @@ -1910,6 +1910,8 @@ export function issueRoutes( hiddenAt: hiddenAtRaw, ...updateFields } = req.body; + const shouldCancelActiveRunForCancelledStatus = + existing.status !== "cancelled" && updateFields.status === "cancelled"; if (resumeRequested === true && !commentBody) { res.status(400).json({ error: "Follow-up intent requires a comment" }); return; @@ -1982,6 +1984,10 @@ export function issueRoutes( } } + const runToCancelForCancelledStatus = shouldCancelActiveRunForCancelledStatus + ? await resolveActiveIssueRun(existing) + : null; + if (hiddenAtRaw !== undefined) { updateFields.hiddenAt = hiddenAtRaw ? new Date(hiddenAtRaw) : null; } @@ -2134,6 +2140,41 @@ export function issueRoutes( res.status(404).json({ error: "Issue not found" }); return; } + + let cancelledStatusRunId: string | null = null; + if (runToCancelForCancelledStatus) { + try { + const cancelled = await heartbeat.cancelRun(runToCancelForCancelledStatus.id); + if (cancelled) { + cancelledStatusRunId = cancelled.id; + await logActivity(db, { + companyId: cancelled.companyId, + actorType: actor.actorType, + actorId: actor.actorId, + agentId: actor.agentId, + runId: actor.runId, + action: "heartbeat.cancelled", + entityType: "heartbeat_run", + entityId: cancelled.id, + details: { agentId: cancelled.agentId, source: "issue_status_cancelled", issueId: existing.id }, + }); + } + } catch (err) { + logger.warn({ err, issueId: existing.id, runId: runToCancelForCancelledStatus.id }, "failed to cancel run for cancelled issue"); + await logActivity(db, { + companyId: existing.companyId, + actorType: actor.actorType, + actorId: actor.actorId, + agentId: actor.agentId, + runId: actor.runId, + action: "heartbeat.cancel_failed", + entityType: "heartbeat_run", + entityId: runToCancelForCancelledStatus.id, + details: { source: "issue_status_cancelled", issueId: existing.id }, + }); + } + } + if (titleOrDescriptionChanged) { await issueReferencesSvc.syncIssue(issue.id); } @@ -2200,6 +2241,7 @@ export function issueRoutes( ...(resumeRequested === true ? { resumeIntent: true, followUpRequested: true } : {}), ...(reopened ? { reopened: true, reopenedFrom: reopenFromStatus } : {}), ...(interruptedRunId ? { interruptedRunId } : {}), + ...(cancelledStatusRunId ? { cancelledStatusRunId } : {}), _previous: hasFieldChanges ? previous : undefined, ...summarizeIssueReferenceActivityDetails( updateReferenceDiff diff --git a/server/src/services/heartbeat.ts b/server/src/services/heartbeat.ts index 57a4f62f..0d2c715f 100644 --- a/server/src/services/heartbeat.ts +++ b/server/src/services/heartbeat.ts @@ -3545,6 +3545,90 @@ export function heartbeatService(db: Db, options: HeartbeatServiceOptions = {}) const promotedRunIds: string[] = []; for (const dueRun of dueRuns) { + const dueRunIssueId = readNonEmptyString(parseObject(dueRun.contextSnapshot).issueId); + if (dueRunIssueId) { + const issue = await db + .select({ + id: issues.id, + status: issues.status, + assigneeAgentId: issues.assigneeAgentId, + executionRunId: issues.executionRunId, + }) + .from(issues) + .where(and(eq(issues.id, dueRunIssueId), eq(issues.companyId, dueRun.companyId))) + .then((rows) => rows[0] ?? null); + + if (issue && (issue.assigneeAgentId !== dueRun.agentId || issue.status === "cancelled")) { + const issueCancelled = issue.status === "cancelled"; + const reason = issueCancelled + ? "Cancelled because the issue was cancelled before the scheduled retry became due" + : "Cancelled because the issue was reassigned before the scheduled retry became due"; + const cancelled = await db + .update(heartbeatRuns) + .set({ + status: "cancelled", + finishedAt: now, + error: reason, + errorCode: issueCancelled ? "issue_cancelled" : "issue_reassigned", + updatedAt: now, + }) + .where( + and( + eq(heartbeatRuns.id, dueRun.id), + eq(heartbeatRuns.status, "scheduled_retry"), + lte(heartbeatRuns.scheduledRetryAt, now), + ), + ) + .returning() + .then((rows) => rows[0] ?? null); + + if (!cancelled) continue; + + if (cancelled.wakeupRequestId) { + await db + .update(agentWakeupRequests) + .set({ + status: "cancelled", + finishedAt: now, + error: reason, + updatedAt: now, + }) + .where(eq(agentWakeupRequests.id, cancelled.wakeupRequestId)); + } + + if (issue.executionRunId === cancelled.id) { + await db + .update(issues) + .set({ + executionRunId: null, + executionAgentNameKey: null, + executionLockedAt: null, + updatedAt: now, + }) + .where(and(eq(issues.id, issue.id), eq(issues.executionRunId, cancelled.id))); + } + + await appendRunEvent(cancelled, await nextRunEventSeq(cancelled.id), { + eventType: "lifecycle", + stream: "system", + level: "warn", + message: issueCancelled + ? "Scheduled retry cancelled because issue was cancelled before it became due" + : "Scheduled retry cancelled because issue ownership changed before it became due", + payload: { + issueId: issue.id, + issueStatus: issue.status, + scheduledRetryAttempt: cancelled.scheduledRetryAttempt, + scheduledRetryAt: cancelled.scheduledRetryAt ? new Date(cancelled.scheduledRetryAt).toISOString() : null, + scheduledRetryReason: cancelled.scheduledRetryReason, + previousRetryAgentId: cancelled.agentId, + currentAssigneeAgentId: issue.assigneeAgentId, + }, + }); + continue; + } + } + const promoted = await db .update(heartbeatRuns) .set({ @@ -6228,6 +6312,8 @@ export function heartbeatService(db: Db, options: HeartbeatServiceOptions = {}) .select({ id: issues.id, companyId: issues.companyId, + status: issues.status, + assigneeAgentId: issues.assigneeAgentId, executionRunId: issues.executionRunId, executionAgentNameKey: issues.executionAgentNameKey, }) @@ -6252,6 +6338,88 @@ export function heartbeatService(db: Db, options: HeartbeatServiceOptions = {}) return { kind: "skipped" as const }; } + const cancelStaleScheduledRetry = async (scheduledRun: typeof heartbeatRuns.$inferSelect) => { + const issueCancelled = issue.status === "cancelled"; + if ( + scheduledRun.status !== "scheduled_retry" || + (scheduledRun.agentId === issue.assigneeAgentId && !issueCancelled) + ) { + return false; + } + + const now = new Date(); + const reason = issueCancelled + ? "Cancelled because the issue was cancelled before the scheduled retry became due" + : "Cancelled because the issue was reassigned before the scheduled retry became due"; + const cancelled = await tx + .update(heartbeatRuns) + .set({ + status: "cancelled", + finishedAt: now, + error: reason, + errorCode: issueCancelled ? "issue_cancelled" : "issue_reassigned", + updatedAt: now, + }) + .where(and(eq(heartbeatRuns.id, scheduledRun.id), eq(heartbeatRuns.status, "scheduled_retry"))) + .returning() + .then((rows) => rows[0] ?? null); + + if (!cancelled) return false; + + if (scheduledRun.wakeupRequestId) { + await tx + .update(agentWakeupRequests) + .set({ + status: "cancelled", + finishedAt: now, + error: reason, + updatedAt: now, + }) + .where(eq(agentWakeupRequests.id, scheduledRun.wakeupRequestId)); + } + + if (issue.executionRunId === scheduledRun.id) { + await tx + .update(issues) + .set({ + executionRunId: null, + executionAgentNameKey: null, + executionLockedAt: null, + updatedAt: now, + }) + .where(and(eq(issues.id, issue.id), eq(issues.executionRunId, scheduledRun.id))); + } + + const [eventSeq] = await tx + .select({ maxSeq: sql`max(${heartbeatRunEvents.seq})` }) + .from(heartbeatRunEvents) + .where(eq(heartbeatRunEvents.runId, cancelled.id)); + + await tx.insert(heartbeatRunEvents).values({ + companyId: cancelled.companyId, + runId: cancelled.id, + agentId: cancelled.agentId, + seq: Number(eventSeq?.maxSeq ?? 0) + 1, + eventType: "lifecycle", + stream: "system", + level: "warn", + message: issueCancelled + ? "Scheduled retry cancelled because issue was cancelled before it became due" + : "Scheduled retry cancelled because issue ownership changed before it became due", + payload: { + issueId: issue.id, + issueStatus: issue.status, + scheduledRetryAttempt: cancelled.scheduledRetryAttempt, + scheduledRetryAt: cancelled.scheduledRetryAt ? new Date(cancelled.scheduledRetryAt).toISOString() : null, + scheduledRetryReason: cancelled.scheduledRetryReason, + previousRetryAgentId: cancelled.agentId, + currentAssigneeAgentId: issue.assigneeAgentId, + }, + }); + + return true; + }; + let activeExecutionRun = issue.executionRunId ? await tx .select() @@ -6269,6 +6437,10 @@ export function heartbeatService(db: Db, options: HeartbeatServiceOptions = {}) activeExecutionRun = null; } + if (activeExecutionRun && await cancelStaleScheduledRetry(activeExecutionRun)) { + activeExecutionRun = null; + } + if (!activeExecutionRun && issue.executionRunId) { await tx .update(issues) @@ -6300,21 +6472,25 @@ export function heartbeatService(db: Db, options: HeartbeatServiceOptions = {}) .then((rows) => rows[0] ?? null); if (legacyRun) { - activeExecutionRun = legacyRun; - const legacyAgent = await tx - .select({ name: agents.name }) - .from(agents) - .where(eq(agents.id, legacyRun.agentId)) - .then((rows) => rows[0] ?? null); - await tx - .update(issues) - .set({ - executionRunId: legacyRun.id, - executionAgentNameKey: normalizeAgentNameKey(legacyAgent?.name), - executionLockedAt: new Date(), - updatedAt: new Date(), - }) - .where(eq(issues.id, issue.id)); + if (await cancelStaleScheduledRetry(legacyRun)) { + activeExecutionRun = null; + } else { + activeExecutionRun = legacyRun; + const legacyAgent = await tx + .select({ name: agents.name }) + .from(agents) + .where(eq(agents.id, legacyRun.agentId)) + .then((rows) => rows[0] ?? null); + await tx + .update(issues) + .set({ + executionRunId: legacyRun.id, + executionAgentNameKey: normalizeAgentNameKey(legacyAgent?.name), + executionLockedAt: new Date(), + updatedAt: new Date(), + }) + .where(eq(issues.id, issue.id)); + } } }