diff --git a/server/src/services/heartbeat.ts b/server/src/services/heartbeat.ts index 4492c84a..2c2bf066 100644 --- a/server/src/services/heartbeat.ts +++ b/server/src/services/heartbeat.ts @@ -2,7 +2,7 @@ import fs from "node:fs/promises"; import path from "node:path"; import { execFile as execFileCallback } from "node:child_process"; import { promisify } from "node:util"; -import { and, asc, desc, eq, gt, inArray, sql } from "drizzle-orm"; +import { and, asc, desc, eq, gt, inArray, isNull, or, sql } from "drizzle-orm"; import type { Db } from "@paperclipai/db"; import type { BillingType, ExecutionWorkspace, ExecutionWorkspaceConfig } from "@paperclipai/shared"; import { @@ -2214,6 +2214,29 @@ export function heartbeatService(db: Db) { }); await setWakeupStatus(claimed.wakeupRequestId, "claimed", { claimedAt }); + + // Fix A (lazy locking): stamp executionRunId now that the run is actually running, + // not at queue time. Guard is idempotent — safe if called more than once. + const claimedIssueId = readNonEmptyString(parseObject(claimed.contextSnapshot).issueId); + if (claimedIssueId) { + const claimedAgent = await getAgent(claimed.agentId); + await db + .update(issues) + .set({ + executionRunId: claimed.id, + executionAgentNameKey: normalizeAgentNameKey(claimedAgent?.name), + executionLockedAt: claimedAt, + updatedAt: claimedAt, + }) + .where( + and( + eq(issues.id, claimedIssueId), + eq(issues.companyId, claimed.companyId), + or(isNull(issues.executionRunId), eq(issues.executionRunId, claimed.id)), + ), + ); + } + return claimed; } @@ -3940,15 +3963,9 @@ export function heartbeatService(db: Db) { }) .where(eq(agentWakeupRequests.id, wakeupRequest.id)); - await tx - .update(issues) - .set({ - executionRunId: newRun.id, - executionAgentNameKey: agentNameKey, - executionLockedAt: new Date(), - updatedAt: new Date(), - }) - .where(eq(issues.id, issue.id)); + // executionRunId is NOT stamped here (enqueueWakeup queues the run but + // doesn't start it). It will be stamped in claimQueuedRun() once the run + // transitions to "running" — Fix A (lazy locking). return { kind: "queued" as const, run: newRun }; }); diff --git a/server/src/services/issues.ts b/server/src/services/issues.ts index 21c340f4..bb40be79 100644 --- a/server/src/services/issues.ts +++ b/server/src/services/issues.ts @@ -1632,12 +1632,20 @@ export function issueService(db: Db) { } if (issueData.status && issueData.status !== "in_progress") { patch.checkoutRunId = null; + // Fix B: also clear the execution lock when leaving in_progress + patch.executionRunId = null; + patch.executionAgentNameKey = null; + patch.executionLockedAt = null; } if ( (issueData.assigneeAgentId !== undefined && issueData.assigneeAgentId !== existing.assigneeAgentId) || (issueData.assigneeUserId !== undefined && issueData.assigneeUserId !== existing.assigneeUserId) ) { patch.checkoutRunId = null; + // Fix B: clear execution lock on reassignment, matching checkoutRunId clear + patch.executionRunId = null; + patch.executionAgentNameKey = null; + patch.executionLockedAt = null; } const runUpdate = async (tx: any) => { @@ -1732,6 +1740,40 @@ export function issueService(db: Db) { await assertAssignableAgent(issueCompany.companyId, agentId); const now = new Date(); + + // Fix C: staleness detection — if executionRunId references a run that is no + // longer queued or running, clear it before applying the execution lock condition + // so a dead lock can't produce a spurious 409. + // Wrapped in a transaction with SELECT FOR UPDATE to make the read + clear atomic, + // matching the existing pattern in enqueueWakeup(). + await db.transaction(async (tx) => { + await tx.execute( + sql`select id from issues where id = ${id} for update`, + ); + const preCheckRow = await tx + .select({ executionRunId: issues.executionRunId }) + .from(issues) + .where(eq(issues.id, id)) + .then((rows) => rows[0] ?? null); + if (!preCheckRow?.executionRunId) return; + const lockRun = await tx + .select({ id: heartbeatRuns.id, status: heartbeatRuns.status }) + .from(heartbeatRuns) + .where(eq(heartbeatRuns.id, preCheckRow.executionRunId)) + .then((rows) => rows[0] ?? null); + if (!lockRun || (lockRun.status !== "queued" && lockRun.status !== "running")) { + await tx + .update(issues) + .set({ executionRunId: null, executionAgentNameKey: null, executionLockedAt: null, updatedAt: now }) + .where( + and( + eq(issues.id, id), + eq(issues.executionRunId, preCheckRow.executionRunId), + ), + ); + } + }); + const sameRunAssigneeCondition = checkoutRunId ? and( eq(issues.assigneeAgentId, agentId),