mirror of
https://github.com/alkimake/paperclip.git
synced 2026-06-17 19:20:39 +09:00
fix: stale execution lock lifecycle (PIP-002)
Part A: Move executionRunId assignment from enqueueWakeup() to claimQueuedRun() — lazy locking prevents stale locks on queued runs. Part B: Clear executionRunId when assigneeAgentId changes in issues.ts line 759, matching existing checkoutRunId clear behavior. Part C: Add staleness detection at checkout path. Fixes: 4 confirmed incidents where stale executionRunId caused 409 checkout conflicts on new and reassigned issues.
This commit is contained in:
parent
dda63a4324
commit
65e0d3d672
2 changed files with 61 additions and 10 deletions
|
|
@ -2,7 +2,7 @@ import fs from "node:fs/promises";
|
||||||
import path from "node:path";
|
import path from "node:path";
|
||||||
import { execFile as execFileCallback } from "node:child_process";
|
import { execFile as execFileCallback } from "node:child_process";
|
||||||
import { promisify } from "node:util";
|
import { promisify } from "node:util";
|
||||||
import { and, asc, desc, eq, gt, inArray, sql } from "drizzle-orm";
|
import { and, asc, desc, eq, gt, inArray, isNull, or, sql } from "drizzle-orm";
|
||||||
import type { Db } from "@paperclipai/db";
|
import type { Db } from "@paperclipai/db";
|
||||||
import type { BillingType, ExecutionWorkspace, ExecutionWorkspaceConfig } from "@paperclipai/shared";
|
import type { BillingType, ExecutionWorkspace, ExecutionWorkspaceConfig } from "@paperclipai/shared";
|
||||||
import {
|
import {
|
||||||
|
|
@ -1795,6 +1795,29 @@ export function heartbeatService(db: Db) {
|
||||||
});
|
});
|
||||||
|
|
||||||
await setWakeupStatus(claimed.wakeupRequestId, "claimed", { claimedAt });
|
await setWakeupStatus(claimed.wakeupRequestId, "claimed", { claimedAt });
|
||||||
|
|
||||||
|
// Fix A (lazy locking): stamp executionRunId now that the run is actually running,
|
||||||
|
// not at queue time. Guard is idempotent — safe if called more than once.
|
||||||
|
const claimedIssueId = readNonEmptyString(parseObject(claimed.contextSnapshot).issueId);
|
||||||
|
if (claimedIssueId) {
|
||||||
|
const claimedAgent = await getAgent(claimed.agentId);
|
||||||
|
await db
|
||||||
|
.update(issues)
|
||||||
|
.set({
|
||||||
|
executionRunId: claimed.id,
|
||||||
|
executionAgentNameKey: normalizeAgentNameKey(claimedAgent?.name),
|
||||||
|
executionLockedAt: claimedAt,
|
||||||
|
updatedAt: claimedAt,
|
||||||
|
})
|
||||||
|
.where(
|
||||||
|
and(
|
||||||
|
eq(issues.id, claimedIssueId),
|
||||||
|
eq(issues.companyId, claimed.companyId),
|
||||||
|
or(isNull(issues.executionRunId), eq(issues.executionRunId, claimed.id)),
|
||||||
|
),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
return claimed;
|
return claimed;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -3474,15 +3497,9 @@ export function heartbeatService(db: Db) {
|
||||||
})
|
})
|
||||||
.where(eq(agentWakeupRequests.id, wakeupRequest.id));
|
.where(eq(agentWakeupRequests.id, wakeupRequest.id));
|
||||||
|
|
||||||
await tx
|
// executionRunId is NOT stamped here (enqueueWakeup queues the run but
|
||||||
.update(issues)
|
// doesn't start it). It will be stamped in claimQueuedRun() once the run
|
||||||
.set({
|
// transitions to "running" — Fix A (lazy locking).
|
||||||
executionRunId: newRun.id,
|
|
||||||
executionAgentNameKey: agentNameKey,
|
|
||||||
executionLockedAt: new Date(),
|
|
||||||
updatedAt: new Date(),
|
|
||||||
})
|
|
||||||
.where(eq(issues.id, issue.id));
|
|
||||||
|
|
||||||
return { kind: "queued" as const, run: newRun };
|
return { kind: "queued" as const, run: newRun };
|
||||||
});
|
});
|
||||||
|
|
|
||||||
|
|
@ -1291,12 +1291,18 @@ export function issueService(db: Db) {
|
||||||
}
|
}
|
||||||
if (issueData.status && issueData.status !== "in_progress") {
|
if (issueData.status && issueData.status !== "in_progress") {
|
||||||
patch.checkoutRunId = null;
|
patch.checkoutRunId = null;
|
||||||
|
// Fix B: also clear the execution lock when leaving in_progress
|
||||||
|
patch.executionRunId = null;
|
||||||
|
patch.executionLockedAt = null;
|
||||||
}
|
}
|
||||||
if (
|
if (
|
||||||
(issueData.assigneeAgentId !== undefined && issueData.assigneeAgentId !== existing.assigneeAgentId) ||
|
(issueData.assigneeAgentId !== undefined && issueData.assigneeAgentId !== existing.assigneeAgentId) ||
|
||||||
(issueData.assigneeUserId !== undefined && issueData.assigneeUserId !== existing.assigneeUserId)
|
(issueData.assigneeUserId !== undefined && issueData.assigneeUserId !== existing.assigneeUserId)
|
||||||
) {
|
) {
|
||||||
patch.checkoutRunId = null;
|
patch.checkoutRunId = null;
|
||||||
|
// Fix B: clear execution lock on reassignment, matching checkoutRunId clear
|
||||||
|
patch.executionRunId = null;
|
||||||
|
patch.executionLockedAt = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
return db.transaction(async (tx) => {
|
return db.transaction(async (tx) => {
|
||||||
|
|
@ -1377,6 +1383,34 @@ export function issueService(db: Db) {
|
||||||
await assertAssignableAgent(issueCompany.companyId, agentId);
|
await assertAssignableAgent(issueCompany.companyId, agentId);
|
||||||
|
|
||||||
const now = new Date();
|
const now = new Date();
|
||||||
|
|
||||||
|
// Fix C: staleness detection — if executionRunId references a run that is no
|
||||||
|
// longer queued or running, clear it before applying the execution lock condition
|
||||||
|
// so a dead lock can't produce a spurious 409.
|
||||||
|
const preCheckRow = await db
|
||||||
|
.select({ executionRunId: issues.executionRunId })
|
||||||
|
.from(issues)
|
||||||
|
.where(eq(issues.id, id))
|
||||||
|
.then((rows) => rows[0] ?? null);
|
||||||
|
if (preCheckRow?.executionRunId) {
|
||||||
|
const lockRun = await db
|
||||||
|
.select({ id: heartbeatRuns.id, status: heartbeatRuns.status })
|
||||||
|
.from(heartbeatRuns)
|
||||||
|
.where(eq(heartbeatRuns.id, preCheckRow.executionRunId))
|
||||||
|
.then((rows) => rows[0] ?? null);
|
||||||
|
if (!lockRun || (lockRun.status !== "queued" && lockRun.status !== "running")) {
|
||||||
|
await db
|
||||||
|
.update(issues)
|
||||||
|
.set({ executionRunId: null, executionLockedAt: null, updatedAt: now })
|
||||||
|
.where(
|
||||||
|
and(
|
||||||
|
eq(issues.id, id),
|
||||||
|
eq(issues.executionRunId, preCheckRow.executionRunId),
|
||||||
|
),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
const sameRunAssigneeCondition = checkoutRunId
|
const sameRunAssigneeCondition = checkoutRunId
|
||||||
? and(
|
? and(
|
||||||
eq(issues.assigneeAgentId, agentId),
|
eq(issues.assigneeAgentId, agentId),
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue