mirror of
https://github.com/alkimake/paperclip.git
synced 2026-06-19 20:10:39 +09:00
Fix continuation recovery retry streaks by failure cause (#7031)
## Thinking Path > - Paperclip orchestrates AI agents for zero-human companies. > - The recovery subsystem is responsible for keeping assigned work moving when a live heartbeat run disappears or fails. > - `continuation_recovery` is the path that re-enqueues stranded `in_progress` issues after an interrupted continuation attempt. > - That path recently gained cause-aware retry classes and transient retry caps, but the streak counter was still aggregating mixed failure causes into one retry history. > - That meant a sequence like `timeout -> timeout -> adapter_failed -> adapter_failed` could escalate as a false `3x adapter_failed` streak even though the latest cause had only happened twice. > - This pull request makes continuation retry streaks count only consecutive failures whose `errorCode` matches the latest run and adds a regression test for the mixed-cause case. > - The benefit is that transient retry backoff and escalation now match the actual current failure cause instead of inheriting stale budget from unrelated failures. ## What Changed - Updated `summarizeRecentContinuationRetries(...)` to stop counting as soon as the continuation failure cause no longer matches the latest run's `errorCode`. - Wired the continuation recovery escalation/backoff path to pass the latest classified `errorCode` into the retry streak summarizer. - Added a regression test proving mixed-cause continuation failures do not consume the transient retry cap for a new failure cause. ## Verification - `pnpm exec vitest run server/src/__tests__/heartbeat-process-recovery.test.ts` ## Risks - Low risk. The behavioral change is intentionally narrow, but any future continuation retry modes that rely on `errorCode = null` will now be counted as a separate streak bucket and should be kept in mind when adding new retry classifications. ## Model Used - OpenAI Codex via Paperclip `codex_local` (GPT-5-based Codex coding agent; exact backend revision is not surfaced in the runtime), with tool use, shell execution, and patch application in the local repository. ## Checklist - [x] I have included a thinking path that traces from project context to this change - [x] I have specified the model used (with version and capability details) - [x] I have checked ROADMAP.md and confirmed this PR does not duplicate planned core work - [x] I have run tests locally and they pass - [x] I have added or updated tests where applicable - [ ] If this change affects the UI, I have included before/after screenshots - [ ] I have updated relevant documentation to reflect my changes - [x] I have considered and documented any risks above - [x] I will address all Greptile and reviewer comments before requesting merge --------- Co-authored-by: Paperclip <noreply@paperclip.ing>
This commit is contained in:
parent
aea35fe695
commit
911a1e8b0d
2 changed files with 427 additions and 19 deletions
|
|
@ -328,6 +328,7 @@ describeEmbeddedPostgres("heartbeat orphaned process recovery", () => {
|
|||
await db.delete(agentRuntimeState);
|
||||
await db.delete(companySkills);
|
||||
await db.delete(costEvents);
|
||||
await db.delete(workspaceOperations);
|
||||
await db.delete(environmentLeases);
|
||||
await db.delete(environments);
|
||||
await db.delete(issuePlanDecompositions);
|
||||
|
|
@ -1980,7 +1981,7 @@ describeEmbeddedPostgres("heartbeat orphaned process recovery", () => {
|
|||
});
|
||||
|
||||
it("re-enqueues assigned todo work when the last issue run died and no wake remains", async () => {
|
||||
const { agentId, issueId, runId } = await seedStrandedIssueFixture({
|
||||
const { companyId, agentId, issueId, runId } = await seedStrandedIssueFixture({
|
||||
status: "todo",
|
||||
runStatus: "failed",
|
||||
});
|
||||
|
|
@ -2314,7 +2315,7 @@ describeEmbeddedPostgres("heartbeat orphaned process recovery", () => {
|
|||
});
|
||||
|
||||
it("re-enqueues continuation for stranded in-progress work with no active run", async () => {
|
||||
const { agentId, issueId, runId } = await seedStrandedIssueFixture({
|
||||
const { companyId, agentId, issueId, runId } = await seedStrandedIssueFixture({
|
||||
status: "in_progress",
|
||||
runStatus: "failed",
|
||||
});
|
||||
|
|
@ -2561,6 +2562,272 @@ describeEmbeddedPostgres("heartbeat orphaned process recovery", () => {
|
|||
expect(comments[0]?.body).not.toContain("- Failure: none recorded");
|
||||
});
|
||||
|
||||
it("keeps retrying transient adapter_failed continuation runs before the cap", async () => {
|
||||
const { agentId, issueId, runId } = await seedStrandedIssueFixture({
|
||||
status: "in_progress",
|
||||
runStatus: "failed",
|
||||
retryReason: "issue_continuation_needed",
|
||||
runErrorCode: "adapter_failed",
|
||||
runError: "ssh: connection reset",
|
||||
});
|
||||
const heartbeat = heartbeatService(db);
|
||||
|
||||
const result = await heartbeat.reconcileStrandedAssignedIssues();
|
||||
expect(result.continuationRequeued).toBe(1);
|
||||
expect(result.escalated).toBe(0);
|
||||
expect(result.issueIds).toEqual([issueId]);
|
||||
|
||||
const issue = await db.select().from(issues).where(eq(issues.id, issueId)).then((rows) => rows[0] ?? null);
|
||||
expect(issue?.status).toBe("in_progress");
|
||||
|
||||
const runs = await db.select().from(heartbeatRuns).where(eq(heartbeatRuns.agentId, agentId));
|
||||
expect(runs).toHaveLength(2);
|
||||
const retryRun = runs.find((row) => row.id !== runId);
|
||||
expect(retryRun?.contextSnapshot as Record<string, unknown> | undefined).toMatchObject({
|
||||
issueId,
|
||||
retryReason: "issue_continuation_needed",
|
||||
source: "issue.continuation_recovery",
|
||||
});
|
||||
if (retryRun) {
|
||||
await waitForRunToSettle(heartbeat, retryRun.id);
|
||||
}
|
||||
});
|
||||
|
||||
it("escalates after repeated adapter_failed continuation retries with the cause in the comment", async () => {
|
||||
const { companyId, agentId, issueId, runId } = await seedStrandedIssueFixture({
|
||||
status: "in_progress",
|
||||
runStatus: "failed",
|
||||
retryReason: "issue_continuation_needed",
|
||||
runErrorCode: "adapter_failed",
|
||||
runError: "ssh: connection reset",
|
||||
});
|
||||
// Backfill two more consecutive failed continuation retries so the cap (3) is reached.
|
||||
const olderTimestamps = [
|
||||
new Date("2026-03-18T23:50:00.000Z"),
|
||||
new Date("2026-03-18T23:55:00.000Z"),
|
||||
];
|
||||
for (const finishedAt of olderTimestamps) {
|
||||
await db.insert(heartbeatRuns).values({
|
||||
id: randomUUID(),
|
||||
companyId,
|
||||
agentId,
|
||||
invocationSource: "automation",
|
||||
triggerDetail: "system",
|
||||
status: "failed",
|
||||
contextSnapshot: {
|
||||
issueId,
|
||||
taskId: issueId,
|
||||
wakeReason: "issue_continuation_needed",
|
||||
retryReason: "issue_continuation_needed",
|
||||
source: "issue.continuation_recovery",
|
||||
},
|
||||
errorCode: "adapter_failed",
|
||||
error: "ssh: connection reset",
|
||||
startedAt: finishedAt,
|
||||
finishedAt,
|
||||
createdAt: finishedAt,
|
||||
updatedAt: finishedAt,
|
||||
});
|
||||
}
|
||||
const heartbeat = heartbeatService(db);
|
||||
|
||||
const result = await heartbeat.reconcileStrandedAssignedIssues();
|
||||
expect(result.continuationRequeued).toBe(0);
|
||||
expect(result.escalated).toBe(1);
|
||||
expect(result.issueIds).toEqual([issueId]);
|
||||
|
||||
const issue = await db.select().from(issues).where(eq(issues.id, issueId)).then((rows) => rows[0] ?? null);
|
||||
expect(issue?.status).toBe("blocked");
|
||||
|
||||
await expectSourceScopedStrandedRecoveryAction({
|
||||
companyId,
|
||||
agentId,
|
||||
issueId,
|
||||
runId,
|
||||
previousStatus: "in_progress",
|
||||
retryReason: "issue_continuation_needed",
|
||||
});
|
||||
|
||||
const comments = await db.select().from(issueComments).where(eq(issueComments.issueId, issueId));
|
||||
expect(comments).toHaveLength(1);
|
||||
expect(comments[0]?.body).toContain("retried continuation");
|
||||
expect(comments[0]?.body).toContain("3× attempts");
|
||||
expect(comments[0]?.body).toContain("Latest cause: `adapter_failed`");
|
||||
});
|
||||
|
||||
it("does not count mixed-cause continuation failures toward the transient cap", async () => {
|
||||
const { companyId, agentId, issueId, runId } = await seedStrandedIssueFixture({
|
||||
status: "in_progress",
|
||||
runStatus: "failed",
|
||||
retryReason: "issue_continuation_needed",
|
||||
runErrorCode: "adapter_failed",
|
||||
runError: "ssh: connection reset",
|
||||
});
|
||||
|
||||
await db.insert(heartbeatRuns).values([
|
||||
{
|
||||
id: randomUUID(),
|
||||
companyId,
|
||||
agentId,
|
||||
invocationSource: "automation",
|
||||
triggerDetail: "system",
|
||||
status: "failed",
|
||||
contextSnapshot: {
|
||||
issueId,
|
||||
taskId: issueId,
|
||||
wakeReason: "issue_continuation_needed",
|
||||
retryReason: "issue_continuation_needed",
|
||||
source: "issue.continuation_recovery",
|
||||
},
|
||||
errorCode: "timeout",
|
||||
error: "request timed out",
|
||||
startedAt: new Date("2026-03-18T23:45:00.000Z"),
|
||||
finishedAt: new Date("2026-03-18T23:45:00.000Z"),
|
||||
createdAt: new Date("2026-03-18T23:45:00.000Z"),
|
||||
updatedAt: new Date("2026-03-18T23:45:00.000Z"),
|
||||
},
|
||||
{
|
||||
id: randomUUID(),
|
||||
companyId,
|
||||
agentId,
|
||||
invocationSource: "automation",
|
||||
triggerDetail: "system",
|
||||
status: "failed",
|
||||
contextSnapshot: {
|
||||
issueId,
|
||||
taskId: issueId,
|
||||
wakeReason: "issue_continuation_needed",
|
||||
retryReason: "issue_continuation_needed",
|
||||
source: "issue.continuation_recovery",
|
||||
},
|
||||
errorCode: "timeout",
|
||||
error: "request timed out",
|
||||
startedAt: new Date("2026-03-18T23:50:00.000Z"),
|
||||
finishedAt: new Date("2026-03-18T23:50:00.000Z"),
|
||||
createdAt: new Date("2026-03-18T23:50:00.000Z"),
|
||||
updatedAt: new Date("2026-03-18T23:50:00.000Z"),
|
||||
},
|
||||
{
|
||||
id: randomUUID(),
|
||||
companyId,
|
||||
agentId,
|
||||
invocationSource: "automation",
|
||||
triggerDetail: "system",
|
||||
status: "failed",
|
||||
contextSnapshot: {
|
||||
issueId,
|
||||
taskId: issueId,
|
||||
wakeReason: "issue_continuation_needed",
|
||||
retryReason: "issue_continuation_needed",
|
||||
source: "issue.continuation_recovery",
|
||||
},
|
||||
errorCode: "adapter_failed",
|
||||
error: "ssh: connection reset",
|
||||
startedAt: new Date("2026-03-18T23:55:00.000Z"),
|
||||
finishedAt: new Date("2026-03-18T23:55:00.000Z"),
|
||||
createdAt: new Date("2026-03-18T23:55:00.000Z"),
|
||||
updatedAt: new Date("2026-03-18T23:55:00.000Z"),
|
||||
},
|
||||
]);
|
||||
|
||||
const heartbeat = heartbeatService(db);
|
||||
|
||||
const result = await heartbeat.reconcileStrandedAssignedIssues();
|
||||
expect(result.continuationRequeued).toBe(1);
|
||||
expect(result.escalated).toBe(0);
|
||||
expect(result.issueIds).toEqual([issueId]);
|
||||
|
||||
const issue = await db.select().from(issues).where(eq(issues.id, issueId)).then((rows) => rows[0] ?? null);
|
||||
expect(issue?.status).toBe("in_progress");
|
||||
|
||||
const runs = await db.select().from(heartbeatRuns).where(eq(heartbeatRuns.agentId, agentId));
|
||||
expect(runs).toHaveLength(5);
|
||||
const retryRun = runs.find((row) => {
|
||||
const ctx = row.contextSnapshot as Record<string, unknown> | null;
|
||||
return row.id !== runId &&
|
||||
row.errorCode === null &&
|
||||
ctx?.retryReason === "issue_continuation_needed" &&
|
||||
ctx?.source === "issue.continuation_recovery";
|
||||
});
|
||||
expect(retryRun?.contextSnapshot as Record<string, unknown> | undefined).toMatchObject({
|
||||
issueId,
|
||||
retryReason: "issue_continuation_needed",
|
||||
source: "issue.continuation_recovery",
|
||||
});
|
||||
if (retryRun) {
|
||||
await waitForRunToSettle(heartbeat, retryRun.id);
|
||||
}
|
||||
});
|
||||
|
||||
it("escalates non-retryable continuation failures immediately without enqueuing another retry", async () => {
|
||||
const { companyId, agentId, issueId, runId } = await seedStrandedIssueFixture({
|
||||
status: "in_progress",
|
||||
runStatus: "failed",
|
||||
runErrorCode: "budget_blocked",
|
||||
runError: "Budget exceeded; refusing to dispatch.",
|
||||
});
|
||||
const heartbeat = heartbeatService(db);
|
||||
|
||||
const result = await heartbeat.reconcileStrandedAssignedIssues();
|
||||
expect(result.continuationRequeued).toBe(0);
|
||||
expect(result.escalated).toBe(1);
|
||||
expect(result.issueIds).toEqual([issueId]);
|
||||
|
||||
const issue = await db.select().from(issues).where(eq(issues.id, issueId)).then((rows) => rows[0] ?? null);
|
||||
expect(issue?.status).toBe("blocked");
|
||||
|
||||
await expectSourceScopedStrandedRecoveryAction({
|
||||
companyId,
|
||||
agentId,
|
||||
issueId,
|
||||
runId,
|
||||
previousStatus: "in_progress",
|
||||
retryReason: null,
|
||||
});
|
||||
|
||||
const comments = await db.select().from(issueComments).where(eq(issueComments.issueId, issueId));
|
||||
expect(comments).toHaveLength(1);
|
||||
expect(comments[0]?.body).toContain("non-retryable failure");
|
||||
expect(comments[0]?.body).toContain("`budget_blocked`");
|
||||
|
||||
const followupRuns = await db.select().from(heartbeatRuns).where(eq(heartbeatRuns.agentId, agentId));
|
||||
const continuationRetryRun = followupRuns.find((row) => {
|
||||
const ctx = row.contextSnapshot as Record<string, unknown> | null;
|
||||
return ctx?.retryReason === "issue_continuation_needed";
|
||||
});
|
||||
expect(continuationRetryRun).toBeUndefined();
|
||||
for (const row of followupRuns) {
|
||||
if (row.id !== runId) {
|
||||
await waitForRunToSettle(heartbeat, row.id);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
it("leaves the productive-but-stranded continuation path unchanged under the new classifier", async () => {
|
||||
const { agentId, issueId, runId } = await seedStrandedIssueFixture({
|
||||
status: "in_progress",
|
||||
runStatus: "succeeded",
|
||||
livenessState: "advanced",
|
||||
});
|
||||
const heartbeat = heartbeatService(db);
|
||||
|
||||
const result = await heartbeat.reconcileStrandedAssignedIssues();
|
||||
expect(result.continuationRequeued).toBe(1);
|
||||
expect(result.escalated).toBe(0);
|
||||
expect(result.issueIds).toEqual([issueId]);
|
||||
|
||||
const runs = await db.select().from(heartbeatRuns).where(eq(heartbeatRuns.agentId, agentId));
|
||||
const retryRun = runs.find((row) => row.id !== runId);
|
||||
expect(retryRun?.contextSnapshot as Record<string, unknown> | undefined).toMatchObject({
|
||||
issueId,
|
||||
retryReason: "issue_continuation_needed",
|
||||
source: "issue.productive_terminal_continuation_recovery",
|
||||
});
|
||||
if (retryRun) {
|
||||
await waitForRunToSettle(heartbeat, retryRun.id);
|
||||
}
|
||||
});
|
||||
|
||||
it("reuses the raced stranded recovery issue when duplicate active recovery creation conflicts", async () => {
|
||||
const { companyId, issueId } = await seedStrandedIssueFixture({
|
||||
status: "in_progress",
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue