mirror of
https://github.com/alkimake/paperclip.git
synced 2026-06-14 01:50:39 +09:00
Cancel stale retries when issue ownership changes (#4445)
## Thinking Path > - Paperclip orchestrates AI agents for zero-human companies > - Issue execution is guarded by run locks and bounded retry scheduling > - A failed run can schedule a retry, but the issue may be reassigned before that retry becomes due > - The old assignee's scheduled retry should not continue to hold or reclaim execution for the issue > - This pull request cancels stale scheduled retries when ownership changes and cancels live work when an issue is explicitly cancelled > - The benefit is cleaner issue handoff semantics and fewer stranded or incorrect execution locks ## What Changed - Cancel scheduled retry runs when their issue has been reassigned before the retry is promoted. - Clear stale issue execution locks and cancel the associated wakeup request when a stale retry is cancelled. - Avoid deferring a new assignee behind a previous assignee's scheduled retry. - Cancel an active run when an issue status is explicitly changed to `cancelled`, while leaving `done` transitions alone. - Added route and heartbeat regressions for reassignment and cancellation behavior. ## Verification - `pnpm exec vitest run --project @paperclipai/server server/src/__tests__/heartbeat-retry-scheduling.test.ts server/src/__tests__/issue-comment-reopen-routes.test.ts --pool=forks --poolOptions.forks.isolate=true` - `issue-comment-reopen-routes.test.ts`: 28 passed. - `heartbeat-retry-scheduling.test.ts`: skipped by the existing embedded Postgres host guard (`Postgres init script exited with code null`). - `pnpm --filter @paperclipai/server typecheck` ## Risks - Medium risk because this changes heartbeat retry lifecycle behavior. - The cancellation path is scoped to scheduled retries whose issue assignee no longer matches the retrying agent, and logs a lifecycle event for auditability. - No migrations. > For core feature work, check [`ROADMAP.md`](ROADMAP.md) first and discuss it in `#dev` before opening the PR. Feature PRs that overlap with planned core work may need to be redirected — check the roadmap first. See `CONTRIBUTING.md`. ## Model Used - OpenAI Codex, GPT-5 coding agent, tool-enabled with shell/GitHub/Paperclip API access. Context window was not reported by the runtime. ## Checklist - [x] I have included a thinking path that traces from project context to this change - [x] I have specified the model used (with version and capability details) - [x] I have checked ROADMAP.md and confirmed this PR does not duplicate planned core work - [x] I have run tests locally and they pass - [x] I have added or updated tests where applicable - [x] If this change affects the UI, I have included before/after screenshots - [x] I have updated relevant documentation to reflect my changes - [x] I have considered and documented any risks above - [x] I will address all Greptile and reviewer comments before requesting merge --------- Co-authored-by: Paperclip <noreply@paperclip.ing>
This commit is contained in:
parent
0c6961a03e
commit
6916e30f8e
4 changed files with 676 additions and 15 deletions
|
|
@ -3,11 +3,14 @@ import { eq, sql } from "drizzle-orm";
|
|||
import { afterAll, afterEach, beforeAll, describe, expect, it } from "vitest";
|
||||
import {
|
||||
agents,
|
||||
agentRuntimeState,
|
||||
agentWakeupRequests,
|
||||
companies,
|
||||
createDb,
|
||||
environmentLeases,
|
||||
heartbeatRunEvents,
|
||||
heartbeatRuns,
|
||||
issues,
|
||||
} from "@paperclipai/db";
|
||||
import {
|
||||
getEmbeddedPostgresTestSupport,
|
||||
|
|
@ -40,8 +43,11 @@ describeEmbeddedPostgres("heartbeat bounded retry scheduling", () => {
|
|||
|
||||
afterEach(async () => {
|
||||
await db.delete(heartbeatRunEvents);
|
||||
await db.delete(environmentLeases);
|
||||
await db.delete(issues);
|
||||
await db.delete(heartbeatRuns);
|
||||
await db.delete(agentWakeupRequests);
|
||||
await db.delete(agentRuntimeState);
|
||||
await db.delete(agents);
|
||||
await db.delete(companies);
|
||||
});
|
||||
|
|
@ -212,6 +218,376 @@ describeEmbeddedPostgres("heartbeat bounded retry scheduling", () => {
|
|||
expect(promotedRun?.status).toBe("queued");
|
||||
});
|
||||
|
||||
it("does not defer a new assignee behind the previous assignee's scheduled retry", async () => {
|
||||
const companyId = randomUUID();
|
||||
const oldAgentId = randomUUID();
|
||||
const newAgentId = randomUUID();
|
||||
const issueId = randomUUID();
|
||||
const sourceRunId = randomUUID();
|
||||
const now = new Date("2026-04-20T13:00:00.000Z");
|
||||
|
||||
await db.insert(companies).values({
|
||||
id: companyId,
|
||||
name: "Paperclip",
|
||||
issuePrefix: `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`,
|
||||
requireBoardApprovalForNewAgents: false,
|
||||
});
|
||||
|
||||
await db.insert(agents).values([
|
||||
{
|
||||
id: oldAgentId,
|
||||
companyId,
|
||||
name: "ClaudeCoder",
|
||||
role: "engineer",
|
||||
status: "active",
|
||||
adapterType: "claude_local",
|
||||
adapterConfig: {},
|
||||
runtimeConfig: {
|
||||
heartbeat: {
|
||||
wakeOnDemand: true,
|
||||
maxConcurrentRuns: 1,
|
||||
},
|
||||
},
|
||||
permissions: {},
|
||||
},
|
||||
{
|
||||
id: newAgentId,
|
||||
companyId,
|
||||
name: "CodexCoder",
|
||||
role: "engineer",
|
||||
status: "active",
|
||||
adapterType: "codex_local",
|
||||
adapterConfig: {},
|
||||
runtimeConfig: {
|
||||
heartbeat: {
|
||||
wakeOnDemand: true,
|
||||
maxConcurrentRuns: 1,
|
||||
},
|
||||
},
|
||||
permissions: {},
|
||||
},
|
||||
]);
|
||||
|
||||
await db.insert(heartbeatRuns).values({
|
||||
id: sourceRunId,
|
||||
companyId,
|
||||
agentId: oldAgentId,
|
||||
invocationSource: "assignment",
|
||||
triggerDetail: "system",
|
||||
status: "failed",
|
||||
error: "upstream overload",
|
||||
errorCode: "adapter_failed",
|
||||
finishedAt: now,
|
||||
contextSnapshot: {
|
||||
issueId,
|
||||
wakeReason: "issue_assigned",
|
||||
},
|
||||
updatedAt: now,
|
||||
createdAt: now,
|
||||
});
|
||||
|
||||
await db.insert(issues).values({
|
||||
id: issueId,
|
||||
companyId,
|
||||
title: "Retry reassignment",
|
||||
status: "todo",
|
||||
priority: "medium",
|
||||
assigneeAgentId: oldAgentId,
|
||||
executionRunId: sourceRunId,
|
||||
executionAgentNameKey: "claudecoder",
|
||||
executionLockedAt: now,
|
||||
issueNumber: 1,
|
||||
identifier: `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}-1`,
|
||||
});
|
||||
|
||||
const scheduled = await heartbeat.scheduleBoundedRetry(sourceRunId, {
|
||||
now,
|
||||
random: () => 0.5,
|
||||
});
|
||||
expect(scheduled.outcome).toBe("scheduled");
|
||||
if (scheduled.outcome !== "scheduled") return;
|
||||
|
||||
await db.update(issues).set({
|
||||
assigneeAgentId: newAgentId,
|
||||
updatedAt: now,
|
||||
}).where(eq(issues.id, issueId));
|
||||
|
||||
// Keep the new agent's queue from auto-claiming/executing during this unit test.
|
||||
await db.insert(heartbeatRuns).values(
|
||||
Array.from({ length: 5 }, () => ({
|
||||
id: randomUUID(),
|
||||
companyId,
|
||||
agentId: newAgentId,
|
||||
invocationSource: "automation",
|
||||
triggerDetail: "system",
|
||||
status: "running",
|
||||
contextSnapshot: {
|
||||
wakeReason: "test_busy_slot",
|
||||
},
|
||||
startedAt: now,
|
||||
updatedAt: now,
|
||||
createdAt: now,
|
||||
})),
|
||||
);
|
||||
|
||||
const newAssigneeRun = await heartbeat.wakeup(newAgentId, {
|
||||
source: "assignment",
|
||||
triggerDetail: "system",
|
||||
reason: "issue_assigned",
|
||||
payload: {
|
||||
issueId,
|
||||
mutation: "update",
|
||||
},
|
||||
contextSnapshot: {
|
||||
issueId,
|
||||
source: "issue.update",
|
||||
},
|
||||
requestedByActorType: "user",
|
||||
requestedByActorId: "local-board",
|
||||
});
|
||||
|
||||
expect(newAssigneeRun).not.toBeNull();
|
||||
expect(newAssigneeRun?.agentId).toBe(newAgentId);
|
||||
expect(newAssigneeRun?.status).toBe("queued");
|
||||
|
||||
const oldRetry = await db
|
||||
.select({
|
||||
status: heartbeatRuns.status,
|
||||
errorCode: heartbeatRuns.errorCode,
|
||||
})
|
||||
.from(heartbeatRuns)
|
||||
.where(eq(heartbeatRuns.id, scheduled.run.id))
|
||||
.then((rows) => rows[0] ?? null);
|
||||
expect(oldRetry).toEqual({
|
||||
status: "cancelled",
|
||||
errorCode: "issue_reassigned",
|
||||
});
|
||||
|
||||
const deferredWakeups = await db
|
||||
.select({ count: sql<number>`count(*)::int` })
|
||||
.from(agentWakeupRequests)
|
||||
.where(eq(agentWakeupRequests.status, "deferred_issue_execution"))
|
||||
.then((rows) => rows[0]?.count ?? 0);
|
||||
expect(deferredWakeups).toBe(0);
|
||||
});
|
||||
|
||||
it("does not promote a scheduled retry after issue ownership changes", async () => {
|
||||
const companyId = randomUUID();
|
||||
const oldAgentId = randomUUID();
|
||||
const newAgentId = randomUUID();
|
||||
const issueId = randomUUID();
|
||||
const sourceRunId = randomUUID();
|
||||
const now = new Date("2026-04-20T14:00:00.000Z");
|
||||
|
||||
await db.insert(companies).values({
|
||||
id: companyId,
|
||||
name: "Paperclip",
|
||||
issuePrefix: `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`,
|
||||
requireBoardApprovalForNewAgents: false,
|
||||
});
|
||||
|
||||
await db.insert(agents).values([
|
||||
{
|
||||
id: oldAgentId,
|
||||
companyId,
|
||||
name: "ClaudeCoder",
|
||||
role: "engineer",
|
||||
status: "active",
|
||||
adapterType: "claude_local",
|
||||
adapterConfig: {},
|
||||
runtimeConfig: {
|
||||
heartbeat: {
|
||||
wakeOnDemand: true,
|
||||
maxConcurrentRuns: 1,
|
||||
},
|
||||
},
|
||||
permissions: {},
|
||||
},
|
||||
{
|
||||
id: newAgentId,
|
||||
companyId,
|
||||
name: "CodexCoder",
|
||||
role: "engineer",
|
||||
status: "active",
|
||||
adapterType: "codex_local",
|
||||
adapterConfig: {},
|
||||
runtimeConfig: {
|
||||
heartbeat: {
|
||||
wakeOnDemand: true,
|
||||
maxConcurrentRuns: 1,
|
||||
},
|
||||
},
|
||||
permissions: {},
|
||||
},
|
||||
]);
|
||||
|
||||
await db.insert(heartbeatRuns).values({
|
||||
id: sourceRunId,
|
||||
companyId,
|
||||
agentId: oldAgentId,
|
||||
invocationSource: "assignment",
|
||||
triggerDetail: "system",
|
||||
status: "failed",
|
||||
error: "upstream overload",
|
||||
errorCode: "adapter_failed",
|
||||
finishedAt: now,
|
||||
contextSnapshot: {
|
||||
issueId,
|
||||
wakeReason: "issue_assigned",
|
||||
},
|
||||
updatedAt: now,
|
||||
createdAt: now,
|
||||
});
|
||||
|
||||
await db.insert(issues).values({
|
||||
id: issueId,
|
||||
companyId,
|
||||
title: "Retry promotion reassignment",
|
||||
status: "todo",
|
||||
priority: "medium",
|
||||
assigneeAgentId: oldAgentId,
|
||||
executionRunId: sourceRunId,
|
||||
executionAgentNameKey: "claudecoder",
|
||||
executionLockedAt: now,
|
||||
issueNumber: 1,
|
||||
identifier: `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}-2`,
|
||||
});
|
||||
|
||||
const scheduled = await heartbeat.scheduleBoundedRetry(sourceRunId, {
|
||||
now,
|
||||
random: () => 0.5,
|
||||
});
|
||||
expect(scheduled.outcome).toBe("scheduled");
|
||||
if (scheduled.outcome !== "scheduled") return;
|
||||
|
||||
await db.update(issues).set({
|
||||
assigneeAgentId: newAgentId,
|
||||
updatedAt: now,
|
||||
}).where(eq(issues.id, issueId));
|
||||
|
||||
const promotion = await heartbeat.promoteDueScheduledRetries(scheduled.dueAt);
|
||||
expect(promotion).toEqual({ promoted: 0, runIds: [] });
|
||||
|
||||
const oldRetry = await db
|
||||
.select({
|
||||
status: heartbeatRuns.status,
|
||||
errorCode: heartbeatRuns.errorCode,
|
||||
})
|
||||
.from(heartbeatRuns)
|
||||
.where(eq(heartbeatRuns.id, scheduled.run.id))
|
||||
.then((rows) => rows[0] ?? null);
|
||||
expect(oldRetry).toEqual({
|
||||
status: "cancelled",
|
||||
errorCode: "issue_reassigned",
|
||||
});
|
||||
|
||||
const issue = await db
|
||||
.select({ executionRunId: issues.executionRunId })
|
||||
.from(issues)
|
||||
.where(eq(issues.id, issueId))
|
||||
.then((rows) => rows[0] ?? null);
|
||||
expect(issue?.executionRunId).toBeNull();
|
||||
});
|
||||
|
||||
it("does not promote a scheduled retry after the issue is cancelled", async () => {
|
||||
const companyId = randomUUID();
|
||||
const agentId = randomUUID();
|
||||
const issueId = randomUUID();
|
||||
const sourceRunId = randomUUID();
|
||||
const now = new Date("2026-04-20T15:00:00.000Z");
|
||||
|
||||
await db.insert(companies).values({
|
||||
id: companyId,
|
||||
name: "Paperclip",
|
||||
issuePrefix: `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`,
|
||||
requireBoardApprovalForNewAgents: false,
|
||||
});
|
||||
|
||||
await db.insert(agents).values({
|
||||
id: agentId,
|
||||
companyId,
|
||||
name: "CodexCoder",
|
||||
role: "engineer",
|
||||
status: "active",
|
||||
adapterType: "codex_local",
|
||||
adapterConfig: {},
|
||||
runtimeConfig: {
|
||||
heartbeat: {
|
||||
wakeOnDemand: true,
|
||||
maxConcurrentRuns: 1,
|
||||
},
|
||||
},
|
||||
permissions: {},
|
||||
});
|
||||
|
||||
await db.insert(heartbeatRuns).values({
|
||||
id: sourceRunId,
|
||||
companyId,
|
||||
agentId,
|
||||
invocationSource: "assignment",
|
||||
triggerDetail: "system",
|
||||
status: "failed",
|
||||
error: "upstream overload",
|
||||
errorCode: "adapter_failed",
|
||||
finishedAt: now,
|
||||
contextSnapshot: {
|
||||
issueId,
|
||||
wakeReason: "issue_assigned",
|
||||
},
|
||||
updatedAt: now,
|
||||
createdAt: now,
|
||||
});
|
||||
|
||||
await db.insert(issues).values({
|
||||
id: issueId,
|
||||
companyId,
|
||||
title: "Retry promotion cancellation",
|
||||
status: "todo",
|
||||
priority: "medium",
|
||||
assigneeAgentId: agentId,
|
||||
executionRunId: sourceRunId,
|
||||
executionAgentNameKey: "codexcoder",
|
||||
executionLockedAt: now,
|
||||
issueNumber: 1,
|
||||
identifier: `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}-3`,
|
||||
});
|
||||
|
||||
const scheduled = await heartbeat.scheduleBoundedRetry(sourceRunId, {
|
||||
now,
|
||||
random: () => 0.5,
|
||||
});
|
||||
expect(scheduled.outcome).toBe("scheduled");
|
||||
if (scheduled.outcome !== "scheduled") return;
|
||||
|
||||
await db.update(issues).set({
|
||||
status: "cancelled",
|
||||
updatedAt: now,
|
||||
}).where(eq(issues.id, issueId));
|
||||
|
||||
const promotion = await heartbeat.promoteDueScheduledRetries(scheduled.dueAt);
|
||||
expect(promotion).toEqual({ promoted: 0, runIds: [] });
|
||||
|
||||
const oldRetry = await db
|
||||
.select({
|
||||
status: heartbeatRuns.status,
|
||||
errorCode: heartbeatRuns.errorCode,
|
||||
})
|
||||
.from(heartbeatRuns)
|
||||
.where(eq(heartbeatRuns.id, scheduled.run.id))
|
||||
.then((rows) => rows[0] ?? null);
|
||||
expect(oldRetry).toEqual({
|
||||
status: "cancelled",
|
||||
errorCode: "issue_cancelled",
|
||||
});
|
||||
|
||||
const issue = await db
|
||||
.select({ executionRunId: issues.executionRunId })
|
||||
.from(issues)
|
||||
.where(eq(issues.id, issueId))
|
||||
.then((rows) => rows[0] ?? null);
|
||||
expect(issue?.executionRunId).toBeNull();
|
||||
});
|
||||
|
||||
it("exhausts bounded retries after the hard cap", async () => {
|
||||
const companyId = randomUUID();
|
||||
const agentId = randomUUID();
|
||||
|
|
|
|||
|
|
@ -957,6 +957,73 @@ describe.sequential("issue comment reopen routes", () => {
|
|||
);
|
||||
});
|
||||
|
||||
it("cancels an active run when an issue is marked cancelled", async () => {
|
||||
const issue = {
|
||||
...makeIssue("in_progress"),
|
||||
executionRunId: "run-1",
|
||||
};
|
||||
mockIssueService.getById.mockResolvedValue(issue);
|
||||
mockIssueService.update.mockImplementation(async (_id: string, patch: Record<string, unknown>) => ({
|
||||
...issue,
|
||||
...patch,
|
||||
}));
|
||||
mockHeartbeatService.getRun.mockResolvedValue({
|
||||
id: "run-1",
|
||||
companyId: "company-1",
|
||||
agentId: "22222222-2222-4222-8222-222222222222",
|
||||
status: "running",
|
||||
});
|
||||
mockHeartbeatService.cancelRun.mockResolvedValue({
|
||||
id: "run-1",
|
||||
companyId: "company-1",
|
||||
agentId: "22222222-2222-4222-8222-222222222222",
|
||||
status: "cancelled",
|
||||
});
|
||||
|
||||
const res = await request(await installActor(createApp()))
|
||||
.patch("/api/issues/11111111-1111-4111-8111-111111111111")
|
||||
.send({ status: "cancelled" });
|
||||
|
||||
expect(res.status).toBe(200);
|
||||
expect(mockHeartbeatService.getRun).toHaveBeenCalledWith("run-1");
|
||||
expect(mockHeartbeatService.cancelRun).toHaveBeenCalledWith("run-1");
|
||||
expect(mockLogActivity).toHaveBeenCalledWith(
|
||||
expect.anything(),
|
||||
expect.objectContaining({
|
||||
action: "heartbeat.cancelled",
|
||||
details: expect.objectContaining({
|
||||
source: "issue_status_cancelled",
|
||||
issueId: "11111111-1111-4111-8111-111111111111",
|
||||
}),
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("does not cancel active runs when an issue is marked done", async () => {
|
||||
const issue = {
|
||||
...makeIssue("in_progress"),
|
||||
executionRunId: "run-1",
|
||||
};
|
||||
mockIssueService.getById.mockResolvedValue(issue);
|
||||
mockIssueService.update.mockImplementation(async (_id: string, patch: Record<string, unknown>) => ({
|
||||
...issue,
|
||||
...patch,
|
||||
}));
|
||||
mockHeartbeatService.getRun.mockResolvedValue({
|
||||
id: "run-1",
|
||||
companyId: "company-1",
|
||||
agentId: "22222222-2222-4222-8222-222222222222",
|
||||
status: "running",
|
||||
});
|
||||
|
||||
const res = await request(await installActor(createApp()))
|
||||
.patch("/api/issues/11111111-1111-4111-8111-111111111111")
|
||||
.send({ status: "done" });
|
||||
|
||||
expect(res.status).toBe(200);
|
||||
expect(mockHeartbeatService.cancelRun).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("writes decision ids into executionState and inserts the decision inside the transaction", async () => {
|
||||
const policy = await normalizePolicy({
|
||||
stages: [
|
||||
|
|
|
|||
|
|
@ -1910,6 +1910,8 @@ export function issueRoutes(
|
|||
hiddenAt: hiddenAtRaw,
|
||||
...updateFields
|
||||
} = req.body;
|
||||
const shouldCancelActiveRunForCancelledStatus =
|
||||
existing.status !== "cancelled" && updateFields.status === "cancelled";
|
||||
if (resumeRequested === true && !commentBody) {
|
||||
res.status(400).json({ error: "Follow-up intent requires a comment" });
|
||||
return;
|
||||
|
|
@ -1982,6 +1984,10 @@ export function issueRoutes(
|
|||
}
|
||||
}
|
||||
|
||||
const runToCancelForCancelledStatus = shouldCancelActiveRunForCancelledStatus
|
||||
? await resolveActiveIssueRun(existing)
|
||||
: null;
|
||||
|
||||
if (hiddenAtRaw !== undefined) {
|
||||
updateFields.hiddenAt = hiddenAtRaw ? new Date(hiddenAtRaw) : null;
|
||||
}
|
||||
|
|
@ -2134,6 +2140,41 @@ export function issueRoutes(
|
|||
res.status(404).json({ error: "Issue not found" });
|
||||
return;
|
||||
}
|
||||
|
||||
let cancelledStatusRunId: string | null = null;
|
||||
if (runToCancelForCancelledStatus) {
|
||||
try {
|
||||
const cancelled = await heartbeat.cancelRun(runToCancelForCancelledStatus.id);
|
||||
if (cancelled) {
|
||||
cancelledStatusRunId = cancelled.id;
|
||||
await logActivity(db, {
|
||||
companyId: cancelled.companyId,
|
||||
actorType: actor.actorType,
|
||||
actorId: actor.actorId,
|
||||
agentId: actor.agentId,
|
||||
runId: actor.runId,
|
||||
action: "heartbeat.cancelled",
|
||||
entityType: "heartbeat_run",
|
||||
entityId: cancelled.id,
|
||||
details: { agentId: cancelled.agentId, source: "issue_status_cancelled", issueId: existing.id },
|
||||
});
|
||||
}
|
||||
} catch (err) {
|
||||
logger.warn({ err, issueId: existing.id, runId: runToCancelForCancelledStatus.id }, "failed to cancel run for cancelled issue");
|
||||
await logActivity(db, {
|
||||
companyId: existing.companyId,
|
||||
actorType: actor.actorType,
|
||||
actorId: actor.actorId,
|
||||
agentId: actor.agentId,
|
||||
runId: actor.runId,
|
||||
action: "heartbeat.cancel_failed",
|
||||
entityType: "heartbeat_run",
|
||||
entityId: runToCancelForCancelledStatus.id,
|
||||
details: { source: "issue_status_cancelled", issueId: existing.id },
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if (titleOrDescriptionChanged) {
|
||||
await issueReferencesSvc.syncIssue(issue.id);
|
||||
}
|
||||
|
|
@ -2200,6 +2241,7 @@ export function issueRoutes(
|
|||
...(resumeRequested === true ? { resumeIntent: true, followUpRequested: true } : {}),
|
||||
...(reopened ? { reopened: true, reopenedFrom: reopenFromStatus } : {}),
|
||||
...(interruptedRunId ? { interruptedRunId } : {}),
|
||||
...(cancelledStatusRunId ? { cancelledStatusRunId } : {}),
|
||||
_previous: hasFieldChanges ? previous : undefined,
|
||||
...summarizeIssueReferenceActivityDetails(
|
||||
updateReferenceDiff
|
||||
|
|
|
|||
|
|
@ -3545,6 +3545,90 @@ export function heartbeatService(db: Db, options: HeartbeatServiceOptions = {})
|
|||
const promotedRunIds: string[] = [];
|
||||
|
||||
for (const dueRun of dueRuns) {
|
||||
const dueRunIssueId = readNonEmptyString(parseObject(dueRun.contextSnapshot).issueId);
|
||||
if (dueRunIssueId) {
|
||||
const issue = await db
|
||||
.select({
|
||||
id: issues.id,
|
||||
status: issues.status,
|
||||
assigneeAgentId: issues.assigneeAgentId,
|
||||
executionRunId: issues.executionRunId,
|
||||
})
|
||||
.from(issues)
|
||||
.where(and(eq(issues.id, dueRunIssueId), eq(issues.companyId, dueRun.companyId)))
|
||||
.then((rows) => rows[0] ?? null);
|
||||
|
||||
if (issue && (issue.assigneeAgentId !== dueRun.agentId || issue.status === "cancelled")) {
|
||||
const issueCancelled = issue.status === "cancelled";
|
||||
const reason = issueCancelled
|
||||
? "Cancelled because the issue was cancelled before the scheduled retry became due"
|
||||
: "Cancelled because the issue was reassigned before the scheduled retry became due";
|
||||
const cancelled = await db
|
||||
.update(heartbeatRuns)
|
||||
.set({
|
||||
status: "cancelled",
|
||||
finishedAt: now,
|
||||
error: reason,
|
||||
errorCode: issueCancelled ? "issue_cancelled" : "issue_reassigned",
|
||||
updatedAt: now,
|
||||
})
|
||||
.where(
|
||||
and(
|
||||
eq(heartbeatRuns.id, dueRun.id),
|
||||
eq(heartbeatRuns.status, "scheduled_retry"),
|
||||
lte(heartbeatRuns.scheduledRetryAt, now),
|
||||
),
|
||||
)
|
||||
.returning()
|
||||
.then((rows) => rows[0] ?? null);
|
||||
|
||||
if (!cancelled) continue;
|
||||
|
||||
if (cancelled.wakeupRequestId) {
|
||||
await db
|
||||
.update(agentWakeupRequests)
|
||||
.set({
|
||||
status: "cancelled",
|
||||
finishedAt: now,
|
||||
error: reason,
|
||||
updatedAt: now,
|
||||
})
|
||||
.where(eq(agentWakeupRequests.id, cancelled.wakeupRequestId));
|
||||
}
|
||||
|
||||
if (issue.executionRunId === cancelled.id) {
|
||||
await db
|
||||
.update(issues)
|
||||
.set({
|
||||
executionRunId: null,
|
||||
executionAgentNameKey: null,
|
||||
executionLockedAt: null,
|
||||
updatedAt: now,
|
||||
})
|
||||
.where(and(eq(issues.id, issue.id), eq(issues.executionRunId, cancelled.id)));
|
||||
}
|
||||
|
||||
await appendRunEvent(cancelled, await nextRunEventSeq(cancelled.id), {
|
||||
eventType: "lifecycle",
|
||||
stream: "system",
|
||||
level: "warn",
|
||||
message: issueCancelled
|
||||
? "Scheduled retry cancelled because issue was cancelled before it became due"
|
||||
: "Scheduled retry cancelled because issue ownership changed before it became due",
|
||||
payload: {
|
||||
issueId: issue.id,
|
||||
issueStatus: issue.status,
|
||||
scheduledRetryAttempt: cancelled.scheduledRetryAttempt,
|
||||
scheduledRetryAt: cancelled.scheduledRetryAt ? new Date(cancelled.scheduledRetryAt).toISOString() : null,
|
||||
scheduledRetryReason: cancelled.scheduledRetryReason,
|
||||
previousRetryAgentId: cancelled.agentId,
|
||||
currentAssigneeAgentId: issue.assigneeAgentId,
|
||||
},
|
||||
});
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
const promoted = await db
|
||||
.update(heartbeatRuns)
|
||||
.set({
|
||||
|
|
@ -6228,6 +6312,8 @@ export function heartbeatService(db: Db, options: HeartbeatServiceOptions = {})
|
|||
.select({
|
||||
id: issues.id,
|
||||
companyId: issues.companyId,
|
||||
status: issues.status,
|
||||
assigneeAgentId: issues.assigneeAgentId,
|
||||
executionRunId: issues.executionRunId,
|
||||
executionAgentNameKey: issues.executionAgentNameKey,
|
||||
})
|
||||
|
|
@ -6252,6 +6338,88 @@ export function heartbeatService(db: Db, options: HeartbeatServiceOptions = {})
|
|||
return { kind: "skipped" as const };
|
||||
}
|
||||
|
||||
const cancelStaleScheduledRetry = async (scheduledRun: typeof heartbeatRuns.$inferSelect) => {
|
||||
const issueCancelled = issue.status === "cancelled";
|
||||
if (
|
||||
scheduledRun.status !== "scheduled_retry" ||
|
||||
(scheduledRun.agentId === issue.assigneeAgentId && !issueCancelled)
|
||||
) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const now = new Date();
|
||||
const reason = issueCancelled
|
||||
? "Cancelled because the issue was cancelled before the scheduled retry became due"
|
||||
: "Cancelled because the issue was reassigned before the scheduled retry became due";
|
||||
const cancelled = await tx
|
||||
.update(heartbeatRuns)
|
||||
.set({
|
||||
status: "cancelled",
|
||||
finishedAt: now,
|
||||
error: reason,
|
||||
errorCode: issueCancelled ? "issue_cancelled" : "issue_reassigned",
|
||||
updatedAt: now,
|
||||
})
|
||||
.where(and(eq(heartbeatRuns.id, scheduledRun.id), eq(heartbeatRuns.status, "scheduled_retry")))
|
||||
.returning()
|
||||
.then((rows) => rows[0] ?? null);
|
||||
|
||||
if (!cancelled) return false;
|
||||
|
||||
if (scheduledRun.wakeupRequestId) {
|
||||
await tx
|
||||
.update(agentWakeupRequests)
|
||||
.set({
|
||||
status: "cancelled",
|
||||
finishedAt: now,
|
||||
error: reason,
|
||||
updatedAt: now,
|
||||
})
|
||||
.where(eq(agentWakeupRequests.id, scheduledRun.wakeupRequestId));
|
||||
}
|
||||
|
||||
if (issue.executionRunId === scheduledRun.id) {
|
||||
await tx
|
||||
.update(issues)
|
||||
.set({
|
||||
executionRunId: null,
|
||||
executionAgentNameKey: null,
|
||||
executionLockedAt: null,
|
||||
updatedAt: now,
|
||||
})
|
||||
.where(and(eq(issues.id, issue.id), eq(issues.executionRunId, scheduledRun.id)));
|
||||
}
|
||||
|
||||
const [eventSeq] = await tx
|
||||
.select({ maxSeq: sql<number | null>`max(${heartbeatRunEvents.seq})` })
|
||||
.from(heartbeatRunEvents)
|
||||
.where(eq(heartbeatRunEvents.runId, cancelled.id));
|
||||
|
||||
await tx.insert(heartbeatRunEvents).values({
|
||||
companyId: cancelled.companyId,
|
||||
runId: cancelled.id,
|
||||
agentId: cancelled.agentId,
|
||||
seq: Number(eventSeq?.maxSeq ?? 0) + 1,
|
||||
eventType: "lifecycle",
|
||||
stream: "system",
|
||||
level: "warn",
|
||||
message: issueCancelled
|
||||
? "Scheduled retry cancelled because issue was cancelled before it became due"
|
||||
: "Scheduled retry cancelled because issue ownership changed before it became due",
|
||||
payload: {
|
||||
issueId: issue.id,
|
||||
issueStatus: issue.status,
|
||||
scheduledRetryAttempt: cancelled.scheduledRetryAttempt,
|
||||
scheduledRetryAt: cancelled.scheduledRetryAt ? new Date(cancelled.scheduledRetryAt).toISOString() : null,
|
||||
scheduledRetryReason: cancelled.scheduledRetryReason,
|
||||
previousRetryAgentId: cancelled.agentId,
|
||||
currentAssigneeAgentId: issue.assigneeAgentId,
|
||||
},
|
||||
});
|
||||
|
||||
return true;
|
||||
};
|
||||
|
||||
let activeExecutionRun = issue.executionRunId
|
||||
? await tx
|
||||
.select()
|
||||
|
|
@ -6269,6 +6437,10 @@ export function heartbeatService(db: Db, options: HeartbeatServiceOptions = {})
|
|||
activeExecutionRun = null;
|
||||
}
|
||||
|
||||
if (activeExecutionRun && await cancelStaleScheduledRetry(activeExecutionRun)) {
|
||||
activeExecutionRun = null;
|
||||
}
|
||||
|
||||
if (!activeExecutionRun && issue.executionRunId) {
|
||||
await tx
|
||||
.update(issues)
|
||||
|
|
@ -6300,21 +6472,25 @@ export function heartbeatService(db: Db, options: HeartbeatServiceOptions = {})
|
|||
.then((rows) => rows[0] ?? null);
|
||||
|
||||
if (legacyRun) {
|
||||
activeExecutionRun = legacyRun;
|
||||
const legacyAgent = await tx
|
||||
.select({ name: agents.name })
|
||||
.from(agents)
|
||||
.where(eq(agents.id, legacyRun.agentId))
|
||||
.then((rows) => rows[0] ?? null);
|
||||
await tx
|
||||
.update(issues)
|
||||
.set({
|
||||
executionRunId: legacyRun.id,
|
||||
executionAgentNameKey: normalizeAgentNameKey(legacyAgent?.name),
|
||||
executionLockedAt: new Date(),
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(eq(issues.id, issue.id));
|
||||
if (await cancelStaleScheduledRetry(legacyRun)) {
|
||||
activeExecutionRun = null;
|
||||
} else {
|
||||
activeExecutionRun = legacyRun;
|
||||
const legacyAgent = await tx
|
||||
.select({ name: agents.name })
|
||||
.from(agents)
|
||||
.where(eq(agents.id, legacyRun.agentId))
|
||||
.then((rows) => rows[0] ?? null);
|
||||
await tx
|
||||
.update(issues)
|
||||
.set({
|
||||
executionRunId: legacyRun.id,
|
||||
executionAgentNameKey: normalizeAgentNameKey(legacyAgent?.name),
|
||||
executionLockedAt: new Date(),
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(eq(issues.id, issue.id));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue