Merge pull request #2643 from chrisschwer/fix/stale-execution-lock-lifecycle

fix: stale execution lock lifecycle (PIP-002)
This commit is contained in:
Dotta 2026-04-07 22:55:53 -05:00 committed by GitHub
commit 943b851a5e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 69 additions and 10 deletions

View file

@ -2,7 +2,7 @@ import fs from "node:fs/promises";
import path from "node:path";
import { execFile as execFileCallback } from "node:child_process";
import { promisify } from "node:util";
import { and, asc, desc, eq, gt, inArray, sql } from "drizzle-orm";
import { and, asc, desc, eq, gt, inArray, isNull, or, sql } from "drizzle-orm";
import type { Db } from "@paperclipai/db";
import type { BillingType, ExecutionWorkspace, ExecutionWorkspaceConfig } from "@paperclipai/shared";
import {
@ -2214,6 +2214,29 @@ export function heartbeatService(db: Db) {
});
await setWakeupStatus(claimed.wakeupRequestId, "claimed", { claimedAt });
// Fix A (lazy locking): stamp executionRunId now that the run is actually running,
// not at queue time. Guard is idempotent — safe if called more than once.
const claimedIssueId = readNonEmptyString(parseObject(claimed.contextSnapshot).issueId);
if (claimedIssueId) {
const claimedAgent = await getAgent(claimed.agentId);
await db
.update(issues)
.set({
executionRunId: claimed.id,
executionAgentNameKey: normalizeAgentNameKey(claimedAgent?.name),
executionLockedAt: claimedAt,
updatedAt: claimedAt,
})
.where(
and(
eq(issues.id, claimedIssueId),
eq(issues.companyId, claimed.companyId),
or(isNull(issues.executionRunId), eq(issues.executionRunId, claimed.id)),
),
);
}
return claimed;
}
@ -3940,15 +3963,9 @@ export function heartbeatService(db: Db) {
})
.where(eq(agentWakeupRequests.id, wakeupRequest.id));
await tx
.update(issues)
.set({
executionRunId: newRun.id,
executionAgentNameKey: agentNameKey,
executionLockedAt: new Date(),
updatedAt: new Date(),
})
.where(eq(issues.id, issue.id));
// executionRunId is NOT stamped here (enqueueWakeup queues the run but
// doesn't start it). It will be stamped in claimQueuedRun() once the run
// transitions to "running" — Fix A (lazy locking).
return { kind: "queued" as const, run: newRun };
});

View file

@ -1632,12 +1632,20 @@ export function issueService(db: Db) {
}
if (issueData.status && issueData.status !== "in_progress") {
patch.checkoutRunId = null;
// Fix B: also clear the execution lock when leaving in_progress
patch.executionRunId = null;
patch.executionAgentNameKey = null;
patch.executionLockedAt = null;
}
if (
(issueData.assigneeAgentId !== undefined && issueData.assigneeAgentId !== existing.assigneeAgentId) ||
(issueData.assigneeUserId !== undefined && issueData.assigneeUserId !== existing.assigneeUserId)
) {
patch.checkoutRunId = null;
// Fix B: clear execution lock on reassignment, matching checkoutRunId clear
patch.executionRunId = null;
patch.executionAgentNameKey = null;
patch.executionLockedAt = null;
}
const runUpdate = async (tx: any) => {
@ -1732,6 +1740,40 @@ export function issueService(db: Db) {
await assertAssignableAgent(issueCompany.companyId, agentId);
const now = new Date();
// Fix C: staleness detection — if executionRunId references a run that is no
// longer queued or running, clear it before applying the execution lock condition
// so a dead lock can't produce a spurious 409.
// Wrapped in a transaction with SELECT FOR UPDATE to make the read + clear atomic,
// matching the existing pattern in enqueueWakeup().
await db.transaction(async (tx) => {
await tx.execute(
sql`select id from issues where id = ${id} for update`,
);
const preCheckRow = await tx
.select({ executionRunId: issues.executionRunId })
.from(issues)
.where(eq(issues.id, id))
.then((rows) => rows[0] ?? null);
if (!preCheckRow?.executionRunId) return;
const lockRun = await tx
.select({ id: heartbeatRuns.id, status: heartbeatRuns.status })
.from(heartbeatRuns)
.where(eq(heartbeatRuns.id, preCheckRow.executionRunId))
.then((rows) => rows[0] ?? null);
if (!lockRun || (lockRun.status !== "queued" && lockRun.status !== "running")) {
await tx
.update(issues)
.set({ executionRunId: null, executionAgentNameKey: null, executionLockedAt: null, updatedAt: now })
.where(
and(
eq(issues.id, id),
eq(issues.executionRunId, preCheckRow.executionRunId),
),
);
}
});
const sameRunAssigneeCondition = checkoutRunId
? and(
eq(issues.assigneeAgentId, agentId),