From 15eac43b43f78cfe890823378a00824828e272d9 Mon Sep 17 00:00:00 2001 From: Dotta <34892728+cryppadotta@users.noreply.github.com> Date: Sun, 3 May 2026 11:30:48 -0500 Subject: [PATCH] [codex] Retry max-turn exhausted heartbeats (#5096) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Thinking Path > - Paperclip orchestrates AI agents for autonomous companies, and heartbeat execution is the control-plane loop that keeps assigned work moving. > - Max-turn exhaustion is a recoverable local-adapter stop condition for Claude and Gemini agents when a run needs another heartbeat to continue safely. > - The previous behavior could leave max-turn continuation details hard to inspect, and duplicate/stale continuation wakes could keep running after issue state changed. > - The adapter layer also needed to avoid trusting arbitrary stdout/stderr text as scheduler control metadata. > - This pull request adds bounded max-turn continuation scheduling, visible retry state, structured stop metadata handling, and stale/duplicate continuation guards. > - The benefit is safer automatic continuation after max-turn stops, clearer operator visibility, and fewer duplicate or stale agent runs. ## What Changed - Replaces closed PR #4952, whose head repository was deleted. - Rebases the recovered max-turn continuation branch onto current `paperclipai/paperclip:master`. - Adds max-turn continuation scheduling and retry-state plumbing for heartbeat runs. - Adds stale/duplicate continuation suppression when issue status, ownership, or execution locks change. - Normalizes Claude/Gemini max-turn detection around structured stop metadata instead of unstructured stdout/stderr text. - Surfaces max-turn continuation settings and retry visibility in the board UI. - Adds focused server, adapter, and UI tests for max-turn stop metadata, retry scheduling, stale queued-run invalidation, adapter parsing/execution, run ledger display, and agent config patching. ## Verification - `pnpm install --no-frozen-lockfile` to refresh local dependencies after rebasing onto current `master`. - `pnpm run preflight:workspace-links && pnpm exec vitest run server/src/__tests__/claude-local-adapter.test.ts server/src/__tests__/claude-local-execute.test.ts server/src/__tests__/gemini-local-adapter.test.ts server/src/__tests__/gemini-local-execute.test.ts server/src/__tests__/heartbeat-retry-scheduling.test.ts server/src/__tests__/heartbeat-stale-queue-invalidation.test.ts server/src/services/heartbeat-stop-metadata.test.ts ui/src/components/IssueRunLedger.test.tsx ui/src/lib/agent-config-patch.test.ts ui/src/lib/runRetryState.test.ts --testTimeout=20000` - `pnpm --filter @paperclipai/adapter-claude-local typecheck && pnpm --filter @paperclipai/adapter-gemini-local typecheck && pnpm --filter @paperclipai/server typecheck && pnpm --filter @paperclipai/ui typecheck` - UI screenshot note: the UI changes are limited to config/ledger state rendering rather than layout changes; component/unit coverage above verifies the rendered behavior. ## Risks - Medium behavior risk: heartbeat retry gating now suppresses max-turn continuations when issue state or execution locks drift, so any callers that relied on stale continuations running will now see cancellation instead. - Low adapter risk: Claude/Gemini unstructured text no longer triggers max-turn scheduler metadata, so only structured stop signals and Gemini exit code 53 are trusted. - No database migrations. > For core feature work, check [`ROADMAP.md`](ROADMAP.md) first and discuss it in `#dev` before opening the PR. Feature PRs that overlap with planned core work may need to be redirected — check the roadmap first. See `CONTRIBUTING.md`. ## Model Used - OpenAI Codex coding agent, GPT-5-class model, tool-enabled local repository editing and command execution. ## Checklist - [x] I have included a thinking path that traces from project context to this change - [x] I have specified the model used (with version and capability details) - [x] I have checked ROADMAP.md and confirmed this PR does not duplicate planned core work - [x] I have run tests locally and they pass - [x] I have added or updated tests where applicable - [x] If this change affects the UI, I have included before/after screenshots (not applicable: state/default rendering only; covered by component/unit tests) - [x] I have updated relevant documentation to reflect my changes (not applicable: no user-facing command or docs contract changed) - [x] I have considered and documented any risks above - [x] I will address all Greptile and reviewer comments before requesting merge --------- Co-authored-by: Paperclip --- .../claude-local/src/server/execute.ts | 4 + .../adapters/claude-local/src/server/parse.ts | 16 +- .../gemini-local/src/server/execute.ts | 46 +- .../adapters/gemini-local/src/server/parse.ts | 17 +- .../__tests__/claude-local-adapter.test.ts | 18 + .../__tests__/claude-local-execute.test.ts | 131 ++++ .../__tests__/gemini-local-adapter.test.ts | 27 +- .../__tests__/gemini-local-execute.test.ts | 167 ++++ .../heartbeat-retry-scheduling.test.ts | 504 +++++++++++- ...heartbeat-stale-queue-invalidation.test.ts | 156 +++- .../services/heartbeat-stop-metadata.test.ts | 34 + .../src/services/heartbeat-stop-metadata.ts | 12 +- server/src/services/heartbeat.ts | 719 +++++++++++++++--- ui/src/components/AgentConfigForm.tsx | 70 +- ui/src/components/IssueRunLedger.test.tsx | 38 + ui/src/components/IssueRunLedger.tsx | 1 + ui/src/components/agent-config-primitives.tsx | 3 + ui/src/lib/agent-config-patch.test.ts | 29 + ui/src/lib/runRetryState.test.ts | 17 + ui/src/lib/runRetryState.ts | 12 +- ui/src/lib/utils.ts | 14 + 21 files changed, 1915 insertions(+), 120 deletions(-) diff --git a/packages/adapters/claude-local/src/server/execute.ts b/packages/adapters/claude-local/src/server/execute.ts index f047383a..e9ef11e2 100644 --- a/packages/adapters/claude-local/src/server/execute.ts +++ b/packages/adapters/claude-local/src/server/execute.ts @@ -777,6 +777,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise = { ...parsed, + ...(failed && clearSessionForMaxTurns ? { stopReason: "max_turns_exhausted" } : {}), ...(transientUpstream ? { errorFamily: "transient_upstream" } : {}), ...(transientRetryNotBefore ? { retryNotBefore: transientRetryNotBefore.toISOString() } : {}), ...(transientRetryNotBefore ? { transientRetryNotBefore: transientRetryNotBefore.toISOString() } : {}), diff --git a/packages/adapters/claude-local/src/server/parse.ts b/packages/adapters/claude-local/src/server/parse.ts index 4591aaad..f645c4f2 100644 --- a/packages/adapters/claude-local/src/server/parse.ts +++ b/packages/adapters/claude-local/src/server/parse.ts @@ -170,11 +170,19 @@ export function isClaudeMaxTurnsResult(parsed: Record | null | const subtype = asString(parsed.subtype, "").trim().toLowerCase(); if (subtype === "error_max_turns") return true; - const stopReason = asString(parsed.stop_reason, "").trim().toLowerCase(); - if (stopReason === "max_turns") return true; + const structuredStopReasons = [ + parsed.stop_reason, + parsed.stopReason, + parsed.error_code, + parsed.errorCode, + ].map((value) => asString(value, "").trim().toLowerCase()); - const resultText = asString(parsed.result, "").trim(); - return /max(?:imum)?\s+turns?/i.test(resultText); + return structuredStopReasons.some((reason) => + reason === "max_turns" || + reason === "max_turns_exhausted" || + reason === "turn_limit" || + reason === "turn_limit_exhausted", + ); } export function isClaudeUnknownSessionError(parsed: Record): boolean { diff --git a/packages/adapters/gemini-local/src/server/execute.ts b/packages/adapters/gemini-local/src/server/execute.ts index 026da5fa..b9bb581c 100644 --- a/packages/adapters/gemini-local/src/server/execute.ts +++ b/packages/adapters/gemini-local/src/server/execute.ts @@ -531,7 +531,21 @@ export async function execute(ctx: AdapterExecutionContext): Promise) : null; - const parsedError = typeof attempt.parsed.errorMessage === "string" ? attempt.parsed.errorMessage.trim() : ""; - const stderrLine = firstNonEmptyLine(attempt.proc.stderr); - const structuredFailure = attempt.parsed.resultEvent - ? describeGeminiFailure(attempt.parsed.resultEvent) - : null; - const fallbackErrorMessage = - parsedError || - structuredFailure || - stderrLine || - `Gemini exited with code ${attempt.proc.exitCode ?? -1}`; + const resultJson: Record = { + ...(attempt.parsed.resultEvent ?? { + stdout: attempt.proc.stdout, + stderr: attempt.proc.stderr, + }), + ...(failed && clearSessionForTurnLimit ? { stopReason: "max_turns_exhausted" } : {}), + }; return { exitCode: attempt.proc.exitCode, signal: attempt.proc.signal, timedOut: false, - errorMessage: (attempt.proc.exitCode ?? 0) === 0 ? null : fallbackErrorMessage, - errorCode: (attempt.proc.exitCode ?? 0) !== 0 && authMeta.requiresAuth ? "gemini_auth_required" : null, + errorMessage: failed ? fallbackErrorMessage : null, + errorCode: failed && authMeta.requiresAuth + ? "gemini_auth_required" + : failed && clearSessionForTurnLimit + ? "max_turns_exhausted" + : null, usage: attempt.parsed.usage, sessionId: resolvedSessionId, sessionParams: resolvedSessionParams, @@ -577,10 +592,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise asString(value, "").trim().toLowerCase()); - const error = asString(parsed.error, "").trim(); - return /turn\s*limit|max(?:imum)?\s+turns?/i.test(error); + return structuredStopReasons.some((reason) => + reason === "turn_limit" || + reason === "max_turns" || + reason === "max_turns_exhausted" || + reason === "turn_limit_exhausted", + ); } diff --git a/server/src/__tests__/claude-local-adapter.test.ts b/server/src/__tests__/claude-local-adapter.test.ts index d2cb8665..f53c06f6 100644 --- a/server/src/__tests__/claude-local-adapter.test.ts +++ b/server/src/__tests__/claude-local-adapter.test.ts @@ -21,6 +21,15 @@ describe("claude_local max-turn detection", () => { ).toBe(true); }); + it("checks every structured stop field for max-turn exhaustion", () => { + expect( + isClaudeMaxTurnsResult({ + stop_reason: "end_turn", + stopReason: "max_turns_exhausted", + }), + ).toBe(true); + }); + it("returns false for non-max-turn results", () => { expect( isClaudeMaxTurnsResult({ @@ -29,6 +38,15 @@ describe("claude_local max-turn detection", () => { }), ).toBe(false); }); + + it("does not detect max-turn exhaustion from unstructured result text", () => { + expect( + isClaudeMaxTurnsResult({ + subtype: "error", + result: "Tool output said: Maximum turns reached.", + }), + ).toBe(false); + }); }); describe("claude_local ui stdout parser", () => { diff --git a/server/src/__tests__/claude-local-execute.test.ts b/server/src/__tests__/claude-local-execute.test.ts index cad32753..c34a576a 100644 --- a/server/src/__tests__/claude-local-execute.test.ts +++ b/server/src/__tests__/claude-local-execute.test.ts @@ -19,6 +19,24 @@ process.exit(${exit}); await fs.chmod(commandPath, 0o755); } +async function writeTextFailingClaudeCommand( + commandPath: string, + options: { stdout?: string; stderr?: string; exitCode?: number }, +): Promise { + const exit = options.exitCode ?? 1; + const script = `#!/usr/bin/env node +if (${JSON.stringify(options.stdout ?? "")}) { + process.stdout.write(${JSON.stringify(options.stdout ?? "")}); +} +if (${JSON.stringify(options.stderr ?? "")}) { + process.stderr.write(${JSON.stringify(options.stderr ?? "")}); +} +process.exit(${exit}); +`; + await fs.writeFile(commandPath, script, "utf8"); + await fs.chmod(commandPath, 0o755); +} + async function writeFakeClaudeCommand(commandPath: string): Promise { const script = `#!/usr/bin/env node const fs = require("node:fs"); @@ -372,6 +390,119 @@ describe("claude execute", () => { } }); + it("normalizes max-turn exhaustion into scheduler stop metadata", async () => { + const root = await fs.mkdtemp(path.join(os.tmpdir(), "paperclip-claude-exec-max-turns-")); + const resultEvent = { + type: "result", + subtype: "error_max_turns", + session_id: "claude-session-1", + is_error: true, + result: "Maximum turns reached.", + usage: { input_tokens: 1, cache_read_input_tokens: 0, output_tokens: 1 }, + }; + const { workspace, commandPath, restore } = await setupExecuteEnv(root, { + commandWriter: (commandPath) => writeFailingClaudeCommand(commandPath, { resultEvent }), + }); + + try { + const result = await execute({ + runId: "run-max-turns", + agent: { id: "agent-1", companyId: "co-1", name: "Test", adapterType: "claude_local", adapterConfig: {} }, + runtime: { sessionId: null, sessionParams: null, sessionDisplayId: null, taskKey: null }, + config: { + command: commandPath, + cwd: workspace, + promptTemplate: "Do work.", + }, + context: {}, + authToken: "tok", + onLog: async () => {}, + }); + + expect(result.exitCode).toBe(1); + expect(result.errorCode).toBe("max_turns_exhausted"); + expect(result.errorFamily).toBeNull(); + expect(result.resultJson).toMatchObject({ stopReason: "max_turns_exhausted" }); + expect(result.clearSession).toBe(true); + } finally { + restore(); + await fs.rm(root, { recursive: true, force: true }); + } + }); + + it("does not normalize unstructured max-turn text into scheduler stop metadata", async () => { + const root = await fs.mkdtemp(path.join(os.tmpdir(), "paperclip-claude-exec-max-turn-text-")); + const resultEvent = { + type: "result", + subtype: "error", + session_id: "claude-session-1", + is_error: true, + result: "Tool output said: Maximum turns reached.", + }; + const { workspace, commandPath, restore } = await setupExecuteEnv(root, { + commandWriter: (commandPath) => writeFailingClaudeCommand(commandPath, { resultEvent }), + }); + + try { + const result = await execute({ + runId: "run-max-turns-text", + agent: { id: "agent-1", companyId: "co-1", name: "Test", adapterType: "claude_local", adapterConfig: {} }, + runtime: { sessionId: null, sessionParams: null, sessionDisplayId: null, taskKey: null }, + config: { + command: commandPath, + cwd: workspace, + promptTemplate: "Do work.", + }, + context: {}, + authToken: "tok", + onLog: async () => {}, + }); + + expect(result.exitCode).toBe(1); + expect(result.errorCode).not.toBe("max_turns_exhausted"); + expect(result.resultJson?.stopReason).not.toBe("max_turns_exhausted"); + expect(result.clearSession).toBe(false); + } finally { + restore(); + await fs.rm(root, { recursive: true, force: true }); + } + }); + + it("does not normalize fallback stdout/stderr max-turn text into scheduler stop metadata", async () => { + const root = await fs.mkdtemp(path.join(os.tmpdir(), "paperclip-claude-exec-max-turn-fallback-")); + const { workspace, commandPath, restore } = await setupExecuteEnv(root, { + commandWriter: (commandPath) => + writeTextFailingClaudeCommand(commandPath, { + stdout: "attacker-controlled tool output: max turns exhausted\n", + stderr: "Maximum turns reached.\n", + }), + }); + + try { + const result = await execute({ + runId: "run-max-turns-fallback-text", + agent: { id: "agent-1", companyId: "co-1", name: "Test", adapterType: "claude_local", adapterConfig: {} }, + runtime: { sessionId: null, sessionParams: null, sessionDisplayId: null, taskKey: null }, + config: { + command: commandPath, + cwd: workspace, + promptTemplate: "Do work.", + }, + context: {}, + authToken: "tok", + onLog: async () => {}, + }); + + expect(result.exitCode).toBe(1); + expect(result.errorCode).not.toBe("max_turns_exhausted"); + expect(result.resultJson?.stopReason).not.toBe("max_turns_exhausted"); + expect(result.clearSession).toBe(false); + } finally { + restore(); + await fs.rm(root, { recursive: true, force: true }); + } + }); + it("logs HOME, CLAUDE_CONFIG_DIR, and the resolved executable path in invocation metadata", async () => { const root = await fs.mkdtemp(path.join(os.tmpdir(), "paperclip-claude-execute-meta-")); const workspace = path.join(root, "workspace"); diff --git a/server/src/__tests__/gemini-local-adapter.test.ts b/server/src/__tests__/gemini-local-adapter.test.ts index 5c36f6c3..3c22b01c 100644 --- a/server/src/__tests__/gemini-local-adapter.test.ts +++ b/server/src/__tests__/gemini-local-adapter.test.ts @@ -1,5 +1,9 @@ import { describe, expect, it, vi } from "vitest"; -import { isGeminiUnknownSessionError, parseGeminiJsonl } from "@paperclipai/adapter-gemini-local/server"; +import { + isGeminiTurnLimitResult, + isGeminiUnknownSessionError, + parseGeminiJsonl, +} from "@paperclipai/adapter-gemini-local/server"; import { parseGeminiStdoutLine } from "@paperclipai/adapter-gemini-local/ui"; import { printGeminiStreamEvent } from "@paperclipai/adapter-gemini-local/cli"; @@ -79,6 +83,27 @@ describe("gemini_local stale session detection", () => { }); }); +describe("gemini_local turn-limit detection", () => { + it("detects structured turn-limit signals and exit code 53", () => { + expect(isGeminiTurnLimitResult({ status: "turn_limit" })).toBe(true); + expect(isGeminiTurnLimitResult({ stopReason: "max_turns_exhausted" })).toBe(true); + expect(isGeminiTurnLimitResult(null, 53)).toBe(true); + }); + + it("checks every structured stop field for turn-limit exhaustion", () => { + expect( + isGeminiTurnLimitResult({ + status: "success", + stopReason: "turn_limit_exhausted", + }), + ).toBe(true); + }); + + it("does not detect turn-limit exhaustion from unstructured error text", () => { + expect(isGeminiTurnLimitResult({ error: "max_turns reached" })).toBe(false); + }); +}); + describe("gemini_local ui stdout parser", () => { it("parses assistant, thinking, and result events", () => { const ts = "2026-03-08T00:00:00.000Z"; diff --git a/server/src/__tests__/gemini-local-execute.test.ts b/server/src/__tests__/gemini-local-execute.test.ts index 93a4aadd..98381d8b 100644 --- a/server/src/__tests__/gemini-local-execute.test.ts +++ b/server/src/__tests__/gemini-local-execute.test.ts @@ -39,6 +39,35 @@ console.log(JSON.stringify({ await fs.chmod(commandPath, 0o755); } +async function writeFailingGeminiCommand( + commandPath: string, + options: { + stdoutLines?: Array>; + stdout?: string; + stderr?: string; + exitCode?: number; + }, +): Promise { + const stdoutLines = options.stdoutLines ?? []; + const stdout = options.stdout ?? ""; + const stderr = options.stderr ?? ""; + const exit = options.exitCode ?? 1; + const script = `#!/usr/bin/env node +for (const line of ${JSON.stringify(stdoutLines.map((line) => JSON.stringify(line)))}) { + console.log(line); +} +if (${JSON.stringify(stdout)}) { + process.stdout.write(${JSON.stringify(stdout)}); +} +if (${JSON.stringify(stderr)}) { + console.error(${JSON.stringify(stderr)}); +} +process.exit(${exit}); +`; + await fs.writeFile(commandPath, script, "utf8"); + await fs.chmod(commandPath, 0o755); +} + type CapturePayload = { argv: string[]; paperclipEnvKeys: string[]; @@ -169,6 +198,144 @@ describe("gemini execute", () => { } }); + it("normalizes turn-limit exhaustion into scheduler stop metadata", async () => { + const root = await fs.mkdtemp(path.join(os.tmpdir(), "paperclip-gemini-max-turns-")); + const workspace = path.join(root, "workspace"); + const commandPath = path.join(root, "gemini"); + await fs.mkdir(workspace, { recursive: true }); + await writeFailingGeminiCommand(commandPath, { + stdoutLines: [ + { + type: "result", + subtype: "error", + session_id: "gemini-session-1", + status: "turn_limit", + error: "Turn limit reached.", + }, + ], + }); + + const previousHome = process.env.HOME; + process.env.HOME = root; + + try { + const result = await execute({ + runId: "run-turn-limit", + agent: { id: "a1", companyId: "c1", name: "G", adapterType: "gemini_local", adapterConfig: {} }, + runtime: { sessionId: null, sessionParams: null, sessionDisplayId: null, taskKey: null }, + config: { + command: commandPath, + cwd: workspace, + }, + context: {}, + authToken: "t", + onLog: async () => {}, + }); + + expect(result.exitCode).toBe(1); + expect(result.errorCode).toBe("max_turns_exhausted"); + expect(result.resultJson).toMatchObject({ stopReason: "max_turns_exhausted" }); + expect(result.clearSession).toBe(true); + } finally { + if (previousHome === undefined) { + delete process.env.HOME; + } else { + process.env.HOME = previousHome; + } + await fs.rm(root, { recursive: true, force: true }); + } + }); + + it("normalizes Gemini exit code 53 as max-turn exhaustion", async () => { + const root = await fs.mkdtemp(path.join(os.tmpdir(), "paperclip-gemini-exit-53-")); + const workspace = path.join(root, "workspace"); + const commandPath = path.join(root, "gemini"); + await fs.mkdir(workspace, { recursive: true }); + await writeFailingGeminiCommand(commandPath, { + stderr: "Gemini stopped because the max turns limit was reached.", + exitCode: 53, + }); + + const previousHome = process.env.HOME; + process.env.HOME = root; + + try { + const result = await execute({ + runId: "run-exit-53", + agent: { id: "a1", companyId: "c1", name: "G", adapterType: "gemini_local", adapterConfig: {} }, + runtime: { sessionId: null, sessionParams: null, sessionDisplayId: null, taskKey: null }, + config: { + command: commandPath, + cwd: workspace, + }, + context: {}, + authToken: "t", + onLog: async () => {}, + }); + + expect(result.exitCode).toBe(53); + expect(result.errorCode).toBe("max_turns_exhausted"); + expect(result.resultJson).toMatchObject({ stopReason: "max_turns_exhausted" }); + expect(result.clearSession).toBe(true); + } finally { + if (previousHome === undefined) { + delete process.env.HOME; + } else { + process.env.HOME = previousHome; + } + await fs.rm(root, { recursive: true, force: true }); + } + }); + + it("does not normalize unstructured turn-limit text into scheduler stop metadata", async () => { + const root = await fs.mkdtemp(path.join(os.tmpdir(), "paperclip-gemini-max-turn-text-")); + const workspace = path.join(root, "workspace"); + const commandPath = path.join(root, "gemini"); + await fs.mkdir(workspace, { recursive: true }); + await writeFailingGeminiCommand(commandPath, { + stdoutLines: [ + { + type: "result", + subtype: "error", + session_id: "gemini-session-1", + error: "Tool output said: maximum turns reached.", + }, + ], + stdout: "attacker-controlled transcript mentions turn limit reached\n", + stderr: "Gemini stopped because the max turns limit was reached.", + }); + + const previousHome = process.env.HOME; + process.env.HOME = root; + + try { + const result = await execute({ + runId: "run-turn-limit-text", + agent: { id: "a1", companyId: "c1", name: "G", adapterType: "gemini_local", adapterConfig: {} }, + runtime: { sessionId: null, sessionParams: null, sessionDisplayId: null, taskKey: null }, + config: { + command: commandPath, + cwd: workspace, + }, + context: {}, + authToken: "t", + onLog: async () => {}, + }); + + expect(result.exitCode).toBe(1); + expect(result.errorCode).not.toBe("max_turns_exhausted"); + expect(result.resultJson?.stopReason).not.toBe("max_turns_exhausted"); + expect(result.clearSession).toBe(false); + } finally { + if (previousHome === undefined) { + delete process.env.HOME; + } else { + process.env.HOME = previousHome; + } + await fs.rm(root, { recursive: true, force: true }); + } + }); + it("uses a compact wake delta instead of the full heartbeat prompt when resuming a session", async () => { const root = await fs.mkdtemp(path.join(os.tmpdir(), "paperclip-gemini-resume-wake-")); const workspace = path.join(root, "workspace"); diff --git a/server/src/__tests__/heartbeat-retry-scheduling.test.ts b/server/src/__tests__/heartbeat-retry-scheduling.test.ts index 9999c05e..39b4d898 100644 --- a/server/src/__tests__/heartbeat-retry-scheduling.test.ts +++ b/server/src/__tests__/heartbeat-retry-scheduling.test.ts @@ -1,15 +1,17 @@ import { randomUUID } from "node:crypto"; -import { eq, sql } from "drizzle-orm"; +import { and, eq, sql } from "drizzle-orm"; import { afterAll, afterEach, beforeAll, describe, expect, it } from "vitest"; import { agents, agentRuntimeState, agentWakeupRequests, + budgetPolicies, companies, createDb, environmentLeases, heartbeatRunEvents, heartbeatRuns, + issueRelations, issues, } from "@paperclipai/db"; import { @@ -18,6 +20,8 @@ import { } from "./helpers/embedded-postgres.js"; import { BOUNDED_TRANSIENT_HEARTBEAT_RETRY_DELAYS_MS, + MAX_TURN_CONTINUATION_RETRY_REASON, + MAX_TURN_CONTINUATION_WAKE_REASON, heartbeatService, } from "../services/heartbeat.ts"; @@ -44,10 +48,12 @@ describeEmbeddedPostgres("heartbeat bounded retry scheduling", () => { afterEach(async () => { await db.delete(heartbeatRunEvents); await db.delete(environmentLeases); + await db.delete(issueRelations); await db.delete(issues); await db.delete(heartbeatRuns); await db.delete(agentWakeupRequests); await db.delete(agentRuntimeState); + await db.delete(budgetPolicies); await db.delete(agents); await db.delete(companies); }); @@ -124,6 +130,92 @@ describeEmbeddedPostgres("heartbeat bounded retry scheduling", () => { }); } + async function seedMaxTurnFixture(input?: { + companyId?: string; + agentId?: string; + issueId?: string; + runId?: string; + now?: Date; + scheduledRetryAttempt?: number; + runtimeConfig?: Record; + issueStatus?: string; + }) { + const companyId = input?.companyId ?? randomUUID(); + const agentId = input?.agentId ?? randomUUID(); + const issueId = input?.issueId ?? randomUUID(); + const runId = input?.runId ?? randomUUID(); + const now = input?.now ?? new Date("2026-04-20T12:00:00.000Z"); + const issuePrefix = `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`; + + await db.insert(companies).values({ + id: companyId, + name: "Paperclip", + issuePrefix, + requireBoardApprovalForNewAgents: false, + }); + + await db.insert(agents).values({ + id: agentId, + companyId, + name: "ClaudeCoder", + role: "engineer", + status: "active", + adapterType: "claude_local", + adapterConfig: {}, + runtimeConfig: input?.runtimeConfig ?? { + heartbeat: { + wakeOnDemand: true, + maxConcurrentRuns: 1, + maxTurnContinuation: { + enabled: true, + maxAttempts: 2, + delayMs: 1_000, + }, + }, + }, + permissions: {}, + }); + + await db.insert(heartbeatRuns).values({ + id: runId, + companyId, + agentId, + invocationSource: "assignment", + triggerDetail: "system", + status: "failed", + error: "Maximum turns reached", + errorCode: "adapter_failed", + finishedAt: now, + scheduledRetryAttempt: input?.scheduledRetryAttempt ?? 0, + scheduledRetryReason: input?.scheduledRetryAttempt ? MAX_TURN_CONTINUATION_RETRY_REASON : null, + resultJson: { + stopReason: "max_turns_exhausted", + }, + contextSnapshot: { + issueId, + wakeReason: "issue_assigned", + }, + updatedAt: now, + createdAt: now, + }); + + await db.insert(issues).values({ + id: issueId, + companyId, + title: "Continue after max turns", + status: input?.issueStatus ?? "in_progress", + priority: "medium", + assigneeAgentId: agentId, + executionRunId: runId, + executionAgentNameKey: "claudecoder", + executionLockedAt: now, + issueNumber: 1, + identifier: `${issuePrefix}-1`, + }); + + return { companyId, agentId, issueId, runId, now }; + } + it("schedules a retry with durable metadata and only promotes it when due", async () => { const companyId = randomUUID(); const agentId = randomUUID(); @@ -218,6 +310,416 @@ describeEmbeddedPostgres("heartbeat bounded retry scheduling", () => { expect(promotedRun?.status).toBe("queued"); }); + it("schedules max-turn continuations with distinct retry metadata", async () => { + const { runId, now } = await seedMaxTurnFixture(); + + const scheduled = await heartbeat.scheduleBoundedRetry(runId, { + now, + retryReason: MAX_TURN_CONTINUATION_RETRY_REASON, + wakeReason: MAX_TURN_CONTINUATION_WAKE_REASON, + maxAttempts: 2, + delayMs: 1_000, + }); + + expect(scheduled.outcome).toBe("scheduled"); + if (scheduled.outcome !== "scheduled") return; + expect(scheduled.attempt).toBe(1); + expect(scheduled.dueAt.toISOString()).toBe(new Date(now.getTime() + 1_000).toISOString()); + + const retryRun = await db + .select({ + retryOfRunId: heartbeatRuns.retryOfRunId, + status: heartbeatRuns.status, + scheduledRetryAttempt: heartbeatRuns.scheduledRetryAttempt, + scheduledRetryReason: heartbeatRuns.scheduledRetryReason, + contextSnapshot: heartbeatRuns.contextSnapshot, + wakeupRequestId: heartbeatRuns.wakeupRequestId, + }) + .from(heartbeatRuns) + .where(eq(heartbeatRuns.id, scheduled.run.id)) + .then((rows) => rows[0] ?? null); + + expect(retryRun).toMatchObject({ + retryOfRunId: runId, + status: "scheduled_retry", + scheduledRetryAttempt: 1, + scheduledRetryReason: MAX_TURN_CONTINUATION_RETRY_REASON, + }); + expect((retryRun?.contextSnapshot as Record | null)?.wakeReason).toBe( + MAX_TURN_CONTINUATION_WAKE_REASON, + ); + expect((retryRun?.contextSnapshot as Record | null)?.codexTransientFallbackMode ?? null).toBeNull(); + + const wakeupRequest = await db + .select({ reason: agentWakeupRequests.reason, payload: agentWakeupRequests.payload }) + .from(agentWakeupRequests) + .where(eq(agentWakeupRequests.id, retryRun?.wakeupRequestId ?? "")) + .then((rows) => rows[0] ?? null); + expect(wakeupRequest?.reason).toBe(MAX_TURN_CONTINUATION_WAKE_REASON); + expect(wakeupRequest?.payload).toMatchObject({ + retryOfRunId: runId, + retryReason: MAX_TURN_CONTINUATION_RETRY_REASON, + scheduledRetryAttempt: 1, + }); + }); + + it("coalesces duplicate max-turn continuation schedules for the same source run and attempt", async () => { + const { issueId, runId, now } = await seedMaxTurnFixture(); + const retryOptions = { + now, + retryReason: MAX_TURN_CONTINUATION_RETRY_REASON, + wakeReason: MAX_TURN_CONTINUATION_WAKE_REASON, + maxAttempts: 2, + delayMs: 1_000, + }; + + const [first, second] = await Promise.all([ + heartbeat.scheduleBoundedRetry(runId, retryOptions), + heartbeat.scheduleBoundedRetry(runId, retryOptions), + ]); + + expect(first.outcome).toBe("scheduled"); + expect(second.outcome).toBe("scheduled"); + if (first.outcome !== "scheduled" || second.outcome !== "scheduled") return; + + expect(new Set([first.run.id, second.run.id]).size).toBe(1); + + const retryRuns = await db + .select({ + id: heartbeatRuns.id, + wakeupRequestId: heartbeatRuns.wakeupRequestId, + }) + .from(heartbeatRuns) + .where( + and( + eq(heartbeatRuns.retryOfRunId, runId), + eq(heartbeatRuns.scheduledRetryReason, MAX_TURN_CONTINUATION_RETRY_REASON), + eq(heartbeatRuns.scheduledRetryAttempt, 1), + ), + ); + expect(retryRuns).toHaveLength(1); + + const wakeups = await db + .select({ + id: agentWakeupRequests.id, + coalescedCount: agentWakeupRequests.coalescedCount, + idempotencyKey: agentWakeupRequests.idempotencyKey, + }) + .from(agentWakeupRequests) + .where(eq(agentWakeupRequests.reason, MAX_TURN_CONTINUATION_WAKE_REASON)); + expect(wakeups).toHaveLength(1); + expect(wakeups[0]).toMatchObject({ + id: retryRuns[0]?.wakeupRequestId, + coalescedCount: 1, + }); + expect(wakeups[0]?.idempotencyKey).toContain(`:${issueId}:${runId}:1`); + + const issue = await db + .select({ executionRunId: issues.executionRunId }) + .from(issues) + .where(eq(issues.id, issueId)) + .then((rows) => rows[0] ?? null); + expect(issue?.executionRunId).toBe(retryRuns[0]?.id); + }); + + it("does not promote a duplicate max-turn continuation that does not own the issue lock", async () => { + const { companyId, agentId, issueId, runId, now } = await seedMaxTurnFixture(); + + const scheduled = await heartbeat.scheduleBoundedRetry(runId, { + now, + retryReason: MAX_TURN_CONTINUATION_RETRY_REASON, + wakeReason: MAX_TURN_CONTINUATION_WAKE_REASON, + maxAttempts: 2, + delayMs: 1_000, + }); + expect(scheduled.outcome).toBe("scheduled"); + if (scheduled.outcome !== "scheduled") return; + + const duplicateWakeupId = randomUUID(); + const duplicateRunId = randomUUID(); + await db.insert(agentWakeupRequests).values({ + id: duplicateWakeupId, + companyId, + agentId, + source: "automation", + triggerDetail: "system", + reason: MAX_TURN_CONTINUATION_WAKE_REASON, + payload: { + issueId, + retryOfRunId: runId, + retryReason: MAX_TURN_CONTINUATION_RETRY_REASON, + scheduledRetryAttempt: 1, + }, + status: "queued", + requestedByActorType: "system", + }); + await db.insert(heartbeatRuns).values({ + id: duplicateRunId, + companyId, + agentId, + invocationSource: "automation", + triggerDetail: "system", + status: "scheduled_retry", + wakeupRequestId: duplicateWakeupId, + retryOfRunId: runId, + scheduledRetryAt: scheduled.dueAt, + scheduledRetryAttempt: 1, + scheduledRetryReason: MAX_TURN_CONTINUATION_RETRY_REASON, + contextSnapshot: { + issueId, + wakeReason: MAX_TURN_CONTINUATION_WAKE_REASON, + retryReason: MAX_TURN_CONTINUATION_RETRY_REASON, + }, + }); + await db + .update(agentWakeupRequests) + .set({ runId: duplicateRunId }) + .where(eq(agentWakeupRequests.id, duplicateWakeupId)); + + const promotion = await heartbeat.promoteDueScheduledRetries(scheduled.dueAt); + expect(promotion).toEqual({ promoted: 1, runIds: [scheduled.run.id] }); + + const duplicate = await db + .select({ + status: heartbeatRuns.status, + errorCode: heartbeatRuns.errorCode, + }) + .from(heartbeatRuns) + .where(eq(heartbeatRuns.id, duplicateRunId)) + .then((rows) => rows[0] ?? null); + expect(duplicate).toEqual({ + status: "cancelled", + errorCode: "issue_execution_lock_changed", + }); + + const duplicateWakeup = await db + .select({ status: agentWakeupRequests.status }) + .from(agentWakeupRequests) + .where(eq(agentWakeupRequests.id, duplicateWakeupId)) + .then((rows) => rows[0] ?? null); + expect(duplicateWakeup?.status).toBe("cancelled"); + }); + + it.each(["blocked", "todo", "backlog"] as const)( + "does not schedule a max-turn continuation when the issue is already %s", + async (issueStatus) => { + const { issueId, runId, now } = await seedMaxTurnFixture({ issueStatus }); + + const scheduled = await heartbeat.scheduleBoundedRetry(runId, { + now, + retryReason: MAX_TURN_CONTINUATION_RETRY_REASON, + wakeReason: MAX_TURN_CONTINUATION_WAKE_REASON, + maxAttempts: 2, + delayMs: 1_000, + }); + + expect(scheduled).toMatchObject({ + outcome: "not_scheduled", + errorCode: "issue_not_in_progress", + issueId, + }); + + const retryRuns = await db + .select({ count: sql`count(*)::int` }) + .from(heartbeatRuns) + .where(eq(heartbeatRuns.retryOfRunId, runId)) + .then((rows) => rows[0]?.count ?? 0); + expect(retryRuns).toBe(0); + }, + ); + + it.each(["blocked", "todo", "backlog"] as const)( + "cancels a due max-turn continuation when the issue moves to %s before retry promotion", + async (issueStatus) => { + const { issueId, runId, now } = await seedMaxTurnFixture(); + + const scheduled = await heartbeat.scheduleBoundedRetry(runId, { + now, + retryReason: MAX_TURN_CONTINUATION_RETRY_REASON, + wakeReason: MAX_TURN_CONTINUATION_WAKE_REASON, + maxAttempts: 2, + delayMs: 1_000, + }); + expect(scheduled.outcome).toBe("scheduled"); + if (scheduled.outcome !== "scheduled") return; + + await db.update(issues).set({ + status: issueStatus, + updatedAt: new Date(now.getTime() + 500), + }).where(eq(issues.id, issueId)); + + const promotion = await heartbeat.promoteDueScheduledRetries(scheduled.dueAt); + expect(promotion).toEqual({ promoted: 0, runIds: [] }); + + const retryRun = await db + .select({ + status: heartbeatRuns.status, + errorCode: heartbeatRuns.errorCode, + wakeupRequestId: heartbeatRuns.wakeupRequestId, + }) + .from(heartbeatRuns) + .where(eq(heartbeatRuns.id, scheduled.run.id)) + .then((rows) => rows[0] ?? null); + expect(retryRun).toMatchObject({ + status: "cancelled", + errorCode: "issue_not_in_progress", + }); + + const wakeupRequest = await db + .select({ status: agentWakeupRequests.status }) + .from(agentWakeupRequests) + .where(eq(agentWakeupRequests.id, retryRun?.wakeupRequestId ?? "")) + .then((rows) => rows[0] ?? null); + expect(wakeupRequest?.status).toBe("cancelled"); + + const issue = await db + .select({ + executionRunId: issues.executionRunId, + executionAgentNameKey: issues.executionAgentNameKey, + executionLockedAt: issues.executionLockedAt, + }) + .from(issues) + .where(eq(issues.id, issueId)) + .then((rows) => rows[0] ?? null); + expect(issue).toEqual({ + executionRunId: null, + executionAgentNameKey: null, + executionLockedAt: null, + }); + + const event = await db + .select({ + message: heartbeatRunEvents.message, + payload: heartbeatRunEvents.payload, + }) + .from(heartbeatRunEvents) + .where(eq(heartbeatRunEvents.runId, scheduled.run.id)) + .orderBy(sql`${heartbeatRunEvents.seq} desc`) + .then((rows) => rows[0] ?? null); + expect(event?.message).toContain("no longer in_progress"); + expect(event?.payload).toMatchObject({ + currentStatus: issueStatus, + requiredStatus: "in_progress", + scheduledRetryReason: MAX_TURN_CONTINUATION_RETRY_REASON, + }); + }, + ); + + it("does not queue max-turn continuations after the configured cap", async () => { + const { runId, now } = await seedMaxTurnFixture({ scheduledRetryAttempt: 2 }); + + const exhausted = await heartbeat.scheduleBoundedRetry(runId, { + now, + retryReason: MAX_TURN_CONTINUATION_RETRY_REASON, + wakeReason: MAX_TURN_CONTINUATION_WAKE_REASON, + maxAttempts: 2, + delayMs: 1_000, + }); + + expect(exhausted).toEqual({ + outcome: "retry_exhausted", + attempt: 3, + maxAttempts: 2, + }); + + const runCount = await db + .select({ count: sql`count(*)::int` }) + .from(heartbeatRuns) + .then((rows) => rows[0]?.count ?? 0); + expect(runCount).toBe(1); + + const exhaustionEvent = await db + .select({ message: heartbeatRunEvents.message, payload: heartbeatRunEvents.payload }) + .from(heartbeatRunEvents) + .where(eq(heartbeatRunEvents.runId, runId)) + .orderBy(sql`${heartbeatRunEvents.id} desc`) + .then((rows) => rows[0] ?? null); + expect(exhaustionEvent?.message).toContain("Bounded retry exhausted"); + expect(exhaustionEvent?.payload).toMatchObject({ + retryReason: MAX_TURN_CONTINUATION_RETRY_REASON, + maxAttempts: 2, + }); + }); + + it("suppresses max-turn continuation scheduling when budget or dependencies block the issue", async () => { + const budgetBlocked = await seedMaxTurnFixture({ now: new Date("2026-04-20T16:00:00.000Z") }); + await db.insert(budgetPolicies).values({ + companyId: budgetBlocked.companyId, + scopeType: "agent", + scopeId: budgetBlocked.agentId, + windowKind: "monthly", + metric: "billed_cents", + amount: 0, + hardStopEnabled: true, + isActive: true, + }); + await db + .update(agents) + .set({ status: "paused", pauseReason: "budget" }) + .where(eq(agents.id, budgetBlocked.agentId)); + + const budgetResult = await heartbeat.scheduleBoundedRetry(budgetBlocked.runId, { + now: budgetBlocked.now, + retryReason: MAX_TURN_CONTINUATION_RETRY_REASON, + wakeReason: MAX_TURN_CONTINUATION_WAKE_REASON, + maxAttempts: 2, + delayMs: 1_000, + }); + expect(budgetResult).toMatchObject({ + outcome: "not_scheduled", + errorCode: "budget_blocked", + issueId: budgetBlocked.issueId, + }); + + await db.delete(budgetPolicies); + await db.delete(issueRelations); + await db.delete(issues); + await db.delete(heartbeatRunEvents); + await db.delete(heartbeatRuns); + await db.delete(agentWakeupRequests); + await db.delete(agentRuntimeState); + await db.delete(agents); + await db.delete(companies); + + const dependencyBlocked = await seedMaxTurnFixture({ now: new Date("2026-04-20T17:00:00.000Z") }); + const blockerId = randomUUID(); + await db.insert(issues).values({ + id: blockerId, + companyId: dependencyBlocked.companyId, + title: "Blocker", + status: "todo", + priority: "medium", + issueNumber: 2, + identifier: `T${dependencyBlocked.companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}-2`, + }); + await db.insert(issueRelations).values({ + companyId: dependencyBlocked.companyId, + issueId: blockerId, + relatedIssueId: dependencyBlocked.issueId, + type: "blocks", + }); + + const dependencyResult = await heartbeat.scheduleBoundedRetry(dependencyBlocked.runId, { + now: dependencyBlocked.now, + retryReason: MAX_TURN_CONTINUATION_RETRY_REASON, + wakeReason: MAX_TURN_CONTINUATION_WAKE_REASON, + maxAttempts: 2, + delayMs: 1_000, + }); + expect(dependencyResult).toMatchObject({ + outcome: "not_scheduled", + errorCode: "issue_dependencies_blocked", + issueId: dependencyBlocked.issueId, + }); + + const retryRuns = await db + .select({ count: sql`count(*)::int` }) + .from(heartbeatRuns) + .where(eq(heartbeatRuns.retryOfRunId, dependencyBlocked.runId)) + .then((rows) => rows[0]?.count ?? 0); + expect(retryRuns).toBe(0); + }); + it("does not defer a new assignee behind the previous assignee's scheduled retry", async () => { const companyId = randomUUID(); const oldAgentId = randomUUID(); diff --git a/server/src/__tests__/heartbeat-stale-queue-invalidation.test.ts b/server/src/__tests__/heartbeat-stale-queue-invalidation.test.ts index 6c7d85dc..4641d09f 100644 --- a/server/src/__tests__/heartbeat-stale-queue-invalidation.test.ts +++ b/server/src/__tests__/heartbeat-stale-queue-invalidation.test.ts @@ -23,7 +23,11 @@ import { getEmbeddedPostgresTestSupport, startEmbeddedPostgresTestDatabase, } from "./helpers/embedded-postgres.js"; -import { heartbeatService } from "../services/heartbeat.ts"; +import { + MAX_TURN_CONTINUATION_RETRY_REASON, + MAX_TURN_CONTINUATION_WAKE_REASON, + heartbeatService, +} from "../services/heartbeat.ts"; import { runningProcesses } from "../adapters/index.ts"; const mockAdapterExecute = vi.hoisted(() => @@ -189,6 +193,7 @@ describeEmbeddedPostgres("heartbeat stale queued-run invalidation", () => { wakeReason: string; contextExtras?: Record; invocationSource?: "assignment" | "automation"; + scheduledRetryReason?: string | null; }) { const wakeupRequestId = randomUUID(); const runId = randomUUID(); @@ -210,6 +215,7 @@ describeEmbeddedPostgres("heartbeat stale queued-run invalidation", () => { triggerDetail: "system", status: "queued", wakeupRequestId, + scheduledRetryReason: input.scheduledRetryReason ?? null, contextSnapshot: { issueId: input.issueId, wakeReason: input.wakeReason, @@ -345,6 +351,154 @@ describeEmbeddedPostgres("heartbeat stale queued-run invalidation", () => { expect(mockAdapterExecute).not.toHaveBeenCalled(); }); + it("cancels queued max-turn continuations when the issue is no longer in_progress before the run starts", async () => { + const { companyId, agentId } = await seedCompanyAndAgent(); + const issueId = randomUUID(); + await db.insert(issues).values({ + id: issueId, + companyId, + title: "Parked max-turn continuation", + status: "blocked", + priority: "medium", + assigneeAgentId: agentId, + }); + + const { runId, wakeupRequestId } = await seedQueuedRun({ + companyId, + agentId, + issueId, + wakeReason: MAX_TURN_CONTINUATION_WAKE_REASON, + invocationSource: "automation", + scheduledRetryReason: MAX_TURN_CONTINUATION_RETRY_REASON, + contextExtras: { + retryReason: MAX_TURN_CONTINUATION_RETRY_REASON, + }, + }); + + await heartbeat.resumeQueuedRuns(); + + await waitForCondition(async () => { + const run = await db + .select({ status: heartbeatRuns.status }) + .from(heartbeatRuns) + .where(eq(heartbeatRuns.id, runId)) + .then((rows) => rows[0] ?? null); + return run?.status === "cancelled"; + }); + + const [run, wakeup] = await Promise.all([ + db + .select({ + status: heartbeatRuns.status, + errorCode: heartbeatRuns.errorCode, + resultJson: heartbeatRuns.resultJson, + }) + .from(heartbeatRuns) + .where(eq(heartbeatRuns.id, runId)) + .then((rows) => rows[0] ?? null), + db + .select({ status: agentWakeupRequests.status, error: agentWakeupRequests.error }) + .from(agentWakeupRequests) + .where(eq(agentWakeupRequests.id, wakeupRequestId)) + .then((rows) => rows[0] ?? null), + ]); + + expect(run?.status).toBe("cancelled"); + expect(run?.errorCode).toBe("issue_not_in_progress"); + expect(run?.resultJson).toMatchObject({ stopReason: "issue_not_in_progress" }); + expect(wakeup?.status).toBe("skipped"); + expect(wakeup?.error).toContain("no longer in_progress"); + expect(mockAdapterExecute).not.toHaveBeenCalled(); + }); + + it("cancels queued max-turn continuations when another continuation owns the issue lock", async () => { + const { companyId, agentId } = await seedCompanyAndAgent(); + const issueId = randomUUID(); + const lockOwnerRunId = randomUUID(); + + await db.insert(heartbeatRuns).values({ + id: lockOwnerRunId, + companyId, + agentId, + invocationSource: "automation", + triggerDetail: "system", + status: "scheduled_retry", + scheduledRetryReason: MAX_TURN_CONTINUATION_RETRY_REASON, + scheduledRetryAttempt: 1, + scheduledRetryAt: new Date("2026-04-20T12:00:00.000Z"), + contextSnapshot: { + issueId, + wakeReason: MAX_TURN_CONTINUATION_WAKE_REASON, + retryReason: MAX_TURN_CONTINUATION_RETRY_REASON, + }, + }); + + await db.insert(issues).values({ + id: issueId, + companyId, + title: "Duplicate max-turn continuation", + status: "in_progress", + priority: "medium", + assigneeAgentId: agentId, + executionRunId: lockOwnerRunId, + executionAgentNameKey: "claudecoder", + executionLockedAt: new Date("2026-04-20T11:59:00.000Z"), + }); + + const { runId, wakeupRequestId } = await seedQueuedRun({ + companyId, + agentId, + issueId, + wakeReason: MAX_TURN_CONTINUATION_WAKE_REASON, + invocationSource: "automation", + scheduledRetryReason: MAX_TURN_CONTINUATION_RETRY_REASON, + contextExtras: { + retryReason: MAX_TURN_CONTINUATION_RETRY_REASON, + }, + }); + + await heartbeat.resumeQueuedRuns(); + + await waitForCondition(async () => { + const run = await db + .select({ status: heartbeatRuns.status }) + .from(heartbeatRuns) + .where(eq(heartbeatRuns.id, runId)) + .then((rows) => rows[0] ?? null); + return run?.status === "cancelled"; + }); + + const [run, wakeup, issue] = await Promise.all([ + db + .select({ + status: heartbeatRuns.status, + errorCode: heartbeatRuns.errorCode, + resultJson: heartbeatRuns.resultJson, + }) + .from(heartbeatRuns) + .where(eq(heartbeatRuns.id, runId)) + .then((rows) => rows[0] ?? null), + db + .select({ status: agentWakeupRequests.status, error: agentWakeupRequests.error }) + .from(agentWakeupRequests) + .where(eq(agentWakeupRequests.id, wakeupRequestId)) + .then((rows) => rows[0] ?? null), + db + .select({ executionRunId: issues.executionRunId }) + .from(issues) + .where(eq(issues.id, issueId)) + .then((rows) => rows[0] ?? null), + ]); + + expect(run?.status).toBe("cancelled"); + expect(run?.errorCode).toBe("issue_execution_lock_changed"); + expect(run?.resultJson).toMatchObject({ stopReason: "issue_execution_lock_changed" }); + expect(wakeup?.status).toBe("skipped"); + expect(wakeup?.error).toContain("execution lock"); + expect(issue?.executionRunId).toBe(lockOwnerRunId); + expect(mockAdapterExecute).not.toHaveBeenCalled(); + }); + it("cancels queued in_review runs when the current participant changes before the run starts", async () => { const { companyId, agentId } = await seedCompanyAndAgent(); const otherAgentId = randomUUID(); diff --git a/server/src/services/heartbeat-stop-metadata.test.ts b/server/src/services/heartbeat-stop-metadata.test.ts index fa0b6366..fc6d54c8 100644 --- a/server/src/services/heartbeat-stop-metadata.test.ts +++ b/server/src/services/heartbeat-stop-metadata.test.ts @@ -64,6 +64,40 @@ describe("heartbeat stop metadata", () => { ).toBe("cancelled"); }); + it("normalizes max-turn exhaustion stop reasons", () => { + expect( + buildHeartbeatRunStopMetadata({ + adapterType: "claude_local", + adapterConfig: {}, + outcome: "failed", + errorCode: "turn_limit_exhausted", + errorMessage: "turn limit reached", + }).stopReason, + ).toBe("max_turns_exhausted"); + + const merged = mergeHeartbeatRunStopMetadata( + { stopReason: "turn_limit_exhausted" }, + buildHeartbeatRunStopMetadata({ + adapterType: "claude_local", + adapterConfig: {}, + outcome: "failed", + errorCode: "adapter_failed", + }), + ); + expect(merged.stopReason).toBe("max_turns_exhausted"); + }); + + it("prioritizes succeeded outcome over inconsistent max-turn error metadata", () => { + expect( + buildHeartbeatRunStopMetadata({ + adapterType: "claude_local", + adapterConfig: {}, + outcome: "succeeded", + errorCode: "max_turns_exhausted", + }).stopReason, + ).toBe("completed"); + }); + it("preserves existing result fields when merging stop metadata", () => { const result = mergeHeartbeatRunStopMetadata( { summary: "done" }, diff --git a/server/src/services/heartbeat-stop-metadata.ts b/server/src/services/heartbeat-stop-metadata.ts index e04c3b9f..de80a32b 100644 --- a/server/src/services/heartbeat-stop-metadata.ts +++ b/server/src/services/heartbeat-stop-metadata.ts @@ -6,6 +6,7 @@ export type HeartbeatRunStopReason = | "cancelled" | "budget_paused" | "paused" + | "max_turns_exhausted" | "process_lost" | "adapter_failed"; @@ -40,6 +41,12 @@ function defaultTimeoutSecForAdapter(adapterType: string) { return adapterType === "openclaw_gateway" ? 120 : 0; } +export function normalizeMaxTurnStopReason(value: unknown): Extract | null { + return value === "max_turns_exhausted" || value === "turn_limit_exhausted" + ? "max_turns_exhausted" + : null; +} + export function resolveHeartbeatRunTimeoutPolicy( adapterType: string, adapterConfig: Record | null | undefined, @@ -76,6 +83,8 @@ export function inferHeartbeatRunStopReason(input: { errorMessage?: string | null; }): HeartbeatRunStopReason { if (input.outcome === "succeeded") return "completed"; + const maxTurnStopReason = normalizeMaxTurnStopReason(input.errorCode); + if (maxTurnStopReason) return maxTurnStopReason; if (input.outcome === "timed_out") return "timeout"; if (input.outcome === "failed" && input.errorCode === "process_lost") return "process_lost"; if (input.outcome === "cancelled") { @@ -107,9 +116,10 @@ export function mergeHeartbeatRunStopMetadata( resultJson: Record | null | undefined, metadata: HeartbeatRunStopMetadata, ): Record { + const existingMaxTurnStopReason = normalizeMaxTurnStopReason(resultJson?.stopReason); return { ...(resultJson ?? {}), - stopReason: metadata.stopReason, + stopReason: existingMaxTurnStopReason ?? metadata.stopReason, effectiveTimeoutSec: metadata.effectiveTimeoutSec, timeoutConfigured: metadata.timeoutConfigured, timeoutSource: metadata.timeoutSource, diff --git a/server/src/services/heartbeat.ts b/server/src/services/heartbeat.ts index 4d335f5e..02ae14d2 100644 --- a/server/src/services/heartbeat.ts +++ b/server/src/services/heartbeat.ts @@ -70,6 +70,7 @@ import { import { buildHeartbeatRunStopMetadata, mergeHeartbeatRunStopMetadata, + normalizeMaxTurnStopReason, } from "./heartbeat-stop-metadata.js"; import { classifyRunLiveness, @@ -189,12 +190,25 @@ const BOUNDED_TRANSIENT_HEARTBEAT_RETRY_JITTER_RATIO = 0.25; const BOUNDED_TRANSIENT_HEARTBEAT_RETRY_REASON = "transient_failure"; const BOUNDED_TRANSIENT_HEARTBEAT_RETRY_WAKE_REASON = "transient_failure_retry"; const BOUNDED_TRANSIENT_HEARTBEAT_RETRY_MAX_ATTEMPTS = BOUNDED_TRANSIENT_HEARTBEAT_RETRY_DELAYS_MS.length; +export const MAX_TURN_CONTINUATION_RETRY_REASON = "max_turns_continuation"; +export const MAX_TURN_CONTINUATION_WAKE_REASON = "max_turns_continuation_retry"; +const MAX_TURN_CONTINUATION_DEFAULT_MAX_ATTEMPTS = 2; +const MAX_TURN_CONTINUATION_MAX_ATTEMPTS_CAP = 10; +const MAX_TURN_CONTINUATION_DEFAULT_DELAY_MS = 1_000; +const MAX_TURN_CONTINUATION_MAX_DELAY_MS = 5 * 60 * 1000; +const MAX_TURN_CONTINUATION_LIVE_RUN_STATUSES = ["scheduled_retry", "queued", "running"] as const; type CodexTransientFallbackMode = | "same_session" | "safer_invocation" | "fresh_session" | "fresh_session_safer_invocation"; +interface MaxTurnContinuationPolicy { + enabled: boolean; + maxAttempts: number; + delayMs: number; +} + function resolveCodexTransientFallbackMode(attempt: number): CodexTransientFallbackMode { if (attempt <= 1) return "same_session"; if (attempt === 2) return "safer_invocation"; @@ -215,6 +229,16 @@ function readHeartbeatRunErrorFamily( return null; } +function isMaxTurnExhaustionRun( + run: Pick, +) { + const resultJson = parseObject(run.resultJson); + return Boolean( + normalizeMaxTurnStopReason(resultJson.stopReason) ?? + normalizeMaxTurnStopReason(run.errorCode), + ); +} + function readTransientRetryNotBeforeFromRun(run: Pick) { const resultJson = parseObject(run.resultJson); const value = resultJson.retryNotBefore ?? resultJson.transientRetryNotBefore; @@ -4289,6 +4313,274 @@ export function heartbeatService(db: Db, options: HeartbeatServiceOptions = {}) return queued; } + type ScheduledRetryGate = + | { allowed: true } + | { + allowed: false; + reason: string; + errorCode: + | "agent_not_invokable" + | "budget_blocked" + | "issue_not_found" + | "issue_reassigned" + | "issue_cancelled" + | "issue_terminal_status" + | "issue_not_in_progress" + | "issue_execution_lock_changed" + | "issue_review_participant_changed" + | "issue_paused" + | "issue_dependencies_blocked"; + issueId: string | null; + details: Record; + }; + + async function evaluateScheduledRetryGate(input: { + run: typeof heartbeatRuns.$inferSelect; + agent: typeof agents.$inferSelect; + contextSnapshot: Record; + retryReason?: string | null; + enforceIssueExecutionLock?: boolean; + }): Promise { + const { run, agent, contextSnapshot } = input; + const retryReason = + input.retryReason ?? readNonEmptyString(contextSnapshot.retryReason) ?? run.scheduledRetryReason ?? null; + const issueId = readNonEmptyString(contextSnapshot.issueId); + const projectId = readNonEmptyString(contextSnapshot.projectId); + + const budgetBlock = await budgets.getInvocationBlock(run.companyId, run.agentId, { + issueId, + projectId, + }); + if (budgetBlock) { + return { + allowed: false, + reason: budgetBlock.reason, + errorCode: "budget_blocked", + issueId, + details: { + scopeType: budgetBlock.scopeType, + scopeId: budgetBlock.scopeId, + }, + }; + } + + if (agent.status === "paused" || agent.status === "terminated" || agent.status === "pending_approval") { + return { + allowed: false, + reason: "Scheduled retry suppressed because the agent is not invokable", + errorCode: "agent_not_invokable", + issueId, + details: { + agentId: agent.id, + agentStatus: agent.status, + }, + }; + } + + if (!issueId) return { allowed: true }; + + const issue = await db + .select({ + id: issues.id, + status: issues.status, + assigneeAgentId: issues.assigneeAgentId, + executionRunId: issues.executionRunId, + executionState: issues.executionState, + }) + .from(issues) + .where(and(eq(issues.id, issueId), eq(issues.companyId, run.companyId))) + .then((rows) => rows[0] ?? null); + + if (!issue) { + return { + allowed: false, + reason: "Scheduled retry suppressed because the target issue no longer exists", + errorCode: "issue_not_found", + issueId, + details: { issueId }, + }; + } + + if (issue.assigneeAgentId !== run.agentId) { + return { + allowed: false, + reason: "Scheduled retry suppressed because issue ownership changed", + errorCode: "issue_reassigned", + issueId, + details: { + issueId, + previousAssigneeAgentId: run.agentId, + currentAssigneeAgentId: issue.assigneeAgentId, + }, + }; + } + + if (issue.status === "cancelled" || issue.status === "done") { + return { + allowed: false, + reason: `Scheduled retry suppressed because issue reached terminal status (${issue.status})`, + errorCode: issue.status === "cancelled" ? "issue_cancelled" : "issue_terminal_status", + issueId, + details: { issueId, currentStatus: issue.status }, + }; + } + + if (retryReason === MAX_TURN_CONTINUATION_RETRY_REASON && issue.status !== "in_progress") { + return { + allowed: false, + reason: `Scheduled max-turn continuation suppressed because issue is no longer in_progress (current status: ${issue.status})`, + errorCode: "issue_not_in_progress", + issueId, + details: { issueId, currentStatus: issue.status, requiredStatus: "in_progress" }, + }; + } + + if ( + retryReason === MAX_TURN_CONTINUATION_RETRY_REASON && + input.enforceIssueExecutionLock && + issue.executionRunId !== run.id + ) { + return { + allowed: false, + reason: "Scheduled max-turn continuation suppressed because the issue execution lock belongs to a different run", + errorCode: "issue_execution_lock_changed", + issueId, + details: { + issueId, + expectedExecutionRunId: run.id, + currentExecutionRunId: issue.executionRunId, + }, + }; + } + + if (issue.status === "in_review") { + const executionState = parseIssueExecutionState(issue.executionState); + const currentParticipant = executionState?.currentParticipant ?? null; + if (currentParticipant) { + const participantMatches = + currentParticipant.type === "agent" && currentParticipant.agentId === run.agentId; + if (!participantMatches) { + return { + allowed: false, + reason: "Scheduled retry suppressed because the issue is waiting on another review participant", + errorCode: "issue_review_participant_changed", + issueId, + details: { + issueId, + currentStageType: executionState?.currentStageType ?? null, + currentParticipant, + }, + }; + } + } + } + + const activePauseHold = await treeControlSvc.getActivePauseHoldGate(run.companyId, issueId); + if (activePauseHold) { + return { + allowed: false, + reason: "Scheduled retry suppressed because the issue is held by an active subtree pause hold", + errorCode: "issue_paused", + issueId, + details: { + issueId, + holdId: activePauseHold.holdId, + rootIssueId: activePauseHold.rootIssueId, + }, + }; + } + + const dependencyReadiness = await issuesSvc.listDependencyReadiness(run.companyId, [issueId]); + const readiness = dependencyReadiness.get(issueId); + if (readiness && !readiness.isDependencyReady) { + return { + allowed: false, + reason: "Scheduled retry suppressed because issue dependencies are still blocked", + errorCode: "issue_dependencies_blocked", + issueId, + details: { + issueId, + unresolvedBlockerIssueIds: readiness.unresolvedBlockerIssueIds, + unresolvedBlockerCount: readiness.unresolvedBlockerCount, + }, + }; + } + + return { allowed: true }; + } + + async function cancelScheduledRetryForGate( + run: typeof heartbeatRuns.$inferSelect, + gate: Extract, + now: Date, + ) { + const cancelled = await db + .update(heartbeatRuns) + .set({ + status: "cancelled", + finishedAt: now, + error: gate.reason, + errorCode: gate.errorCode, + updatedAt: now, + }) + .where( + and( + eq(heartbeatRuns.id, run.id), + eq(heartbeatRuns.status, "scheduled_retry"), + lte(heartbeatRuns.scheduledRetryAt, now), + ), + ) + .returning() + .then((rows) => rows[0] ?? null); + + if (!cancelled) return null; + + if (cancelled.wakeupRequestId) { + await db + .update(agentWakeupRequests) + .set({ + status: "cancelled", + finishedAt: now, + error: gate.reason, + updatedAt: now, + }) + .where(eq(agentWakeupRequests.id, cancelled.wakeupRequestId)); + } + + if (gate.issueId) { + await db + .update(issues) + .set({ + executionRunId: null, + executionAgentNameKey: null, + executionLockedAt: null, + updatedAt: now, + }) + .where( + and( + eq(issues.companyId, cancelled.companyId), + eq(issues.id, gate.issueId), + eq(issues.executionRunId, cancelled.id), + ), + ); + } + + await appendRunEvent(cancelled, await nextRunEventSeq(cancelled.id), { + eventType: "lifecycle", + stream: "system", + level: "warn", + message: gate.reason, + payload: { + ...gate.details, + scheduledRetryAttempt: cancelled.scheduledRetryAttempt, + scheduledRetryAt: cancelled.scheduledRetryAt ? new Date(cancelled.scheduledRetryAt).toISOString() : null, + scheduledRetryReason: cancelled.scheduledRetryReason, + }, + }); + + return cancelled; + } + async function scheduleBoundedRetryForRun( run: typeof heartbeatRuns.$inferSelect, agent: typeof agents.$inferSelect, @@ -4297,13 +4589,28 @@ export function heartbeatService(db: Db, options: HeartbeatServiceOptions = {}) random?: () => number; retryReason?: string; wakeReason?: string; + maxAttempts?: number; + delayMs?: number; }, ) { const now = opts?.now ?? new Date(); const retryReason = opts?.retryReason ?? BOUNDED_TRANSIENT_HEARTBEAT_RETRY_REASON; const wakeReason = opts?.wakeReason ?? BOUNDED_TRANSIENT_HEARTBEAT_RETRY_WAKE_REASON; + const maxAttempts = Math.max(0, Math.floor(opts?.maxAttempts ?? BOUNDED_TRANSIENT_HEARTBEAT_RETRY_MAX_ATTEMPTS)); const nextAttempt = (run.scheduledRetryAttempt ?? 0) + 1; - const baseSchedule = computeBoundedTransientHeartbeatRetrySchedule(nextAttempt, now, opts?.random); + const baseSchedule = opts?.delayMs != null + ? nextAttempt <= maxAttempts + ? { + attempt: nextAttempt, + baseDelayMs: Math.max(0, Math.floor(opts.delayMs)), + delayMs: Math.max(0, Math.floor(opts.delayMs)), + dueAt: new Date(now.getTime() + Math.max(0, Math.floor(opts.delayMs))), + maxAttempts, + } + : null + : nextAttempt <= maxAttempts + ? computeBoundedTransientHeartbeatRetrySchedule(nextAttempt, now, opts?.random) + : null; const transientRecovery = retryReason === BOUNDED_TRANSIENT_HEARTBEAT_RETRY_REASON ? readTransientRecoveryContractFromRun(run) @@ -4323,13 +4630,13 @@ export function heartbeatService(db: Db, options: HeartbeatServiceOptions = {}) payload: { retryReason, scheduledRetryAttempt: run.scheduledRetryAttempt ?? 0, - maxAttempts: BOUNDED_TRANSIENT_HEARTBEAT_RETRY_MAX_ATTEMPTS, + maxAttempts, }, }); return { outcome: "retry_exhausted" as const, attempt: nextAttempt, - maxAttempts: BOUNDED_TRANSIENT_HEARTBEAT_RETRY_MAX_ATTEMPTS, + maxAttempts, }; } const schedule = @@ -4343,6 +4650,29 @@ export function heartbeatService(db: Db, options: HeartbeatServiceOptions = {}) const contextSnapshot = parseObject(run.contextSnapshot); const issueId = readNonEmptyString(contextSnapshot.issueId); + if (retryReason === MAX_TURN_CONTINUATION_RETRY_REASON) { + const gate = await evaluateScheduledRetryGate({ run, agent, contextSnapshot, retryReason }); + if (!gate.allowed) { + await appendRunEvent(run, await nextRunEventSeq(run.id), { + eventType: "lifecycle", + stream: "system", + level: "warn", + message: gate.reason, + payload: { + retryReason, + scheduledRetryAttempt: nextAttempt, + maxAttempts, + ...gate.details, + }, + }); + return { + outcome: "not_scheduled" as const, + reason: gate.reason, + errorCode: gate.errorCode, + issueId: gate.issueId, + }; + } + } const taskKey = deriveTaskKeyWithHeartbeatFallback(contextSnapshot, null); const sessionBefore = await resolveSessionBeforeForWakeup(agent, taskKey); const retryContextSnapshot: Record = { @@ -4356,8 +4686,158 @@ export function heartbeatService(db: Db, options: HeartbeatServiceOptions = {}) ...(transientRetryNotBefore ? { transientRetryNotBefore: transientRetryNotBefore.toISOString() } : {}), ...(codexTransientFallbackMode ? { codexTransientFallbackMode } : {}), }; + const maxTurnContinuationIdempotencyKey = retryReason === MAX_TURN_CONTINUATION_RETRY_REASON + ? `max-turn-continuation:${run.companyId}:${issueId ?? "no-issue"}:${run.id}:${schedule.attempt}` + : null; + + type ScheduledRetryTransactionResult = + | { + outcome: "scheduled"; + run: typeof heartbeatRuns.$inferSelect; + reusedExisting: boolean; + } + | { + outcome: "not_scheduled"; + reason: string; + errorCode: + | "issue_not_found" + | "issue_reassigned" + | "issue_cancelled" + | "issue_terminal_status" + | "issue_not_in_progress" + | "issue_execution_lock_changed"; + issueId: string | null; + details: Record; + }; + + const scheduleResult = await db.transaction(async (tx): Promise => { + if (retryReason === MAX_TURN_CONTINUATION_RETRY_REASON) { + if (issueId) { + await tx.execute( + sql`select id from issues where company_id = ${run.companyId} and id = ${issueId} for update`, + ); + } else { + await tx.execute( + sql`select id from heartbeat_runs where company_id = ${run.companyId} and id = ${run.id} for update`, + ); + } + + const existingContinuation = await tx + .select() + .from(heartbeatRuns) + .where( + and( + eq(heartbeatRuns.companyId, run.companyId), + eq(heartbeatRuns.retryOfRunId, run.id), + eq(heartbeatRuns.scheduledRetryReason, retryReason), + eq(heartbeatRuns.scheduledRetryAttempt, schedule.attempt), + inArray(heartbeatRuns.status, [...MAX_TURN_CONTINUATION_LIVE_RUN_STATUSES]), + issueId + ? sql`${heartbeatRuns.contextSnapshot} ->> 'issueId' = ${issueId}` + : sql`${heartbeatRuns.contextSnapshot} ->> 'issueId' is null`, + ), + ) + .orderBy(asc(heartbeatRuns.createdAt), asc(heartbeatRuns.id)) + .limit(1) + .then((rows) => rows[0] ?? null); + + if (existingContinuation) { + if (existingContinuation.wakeupRequestId) { + const existingWakeup = await tx + .select({ coalescedCount: agentWakeupRequests.coalescedCount }) + .from(agentWakeupRequests) + .where(eq(agentWakeupRequests.id, existingContinuation.wakeupRequestId)) + .then((rows) => rows[0] ?? null); + + await tx + .update(agentWakeupRequests) + .set({ + coalescedCount: (existingWakeup?.coalescedCount ?? 0) + 1, + updatedAt: now, + }) + .where(eq(agentWakeupRequests.id, existingContinuation.wakeupRequestId)); + } + + return { + outcome: "scheduled", + run: existingContinuation, + reusedExisting: true, + }; + } + + if (issueId) { + const lockedIssue = await tx + .select({ + id: issues.id, + status: issues.status, + assigneeAgentId: issues.assigneeAgentId, + executionRunId: issues.executionRunId, + }) + .from(issues) + .where(and(eq(issues.id, issueId), eq(issues.companyId, run.companyId))) + .then((rows) => rows[0] ?? null); + + if (!lockedIssue) { + return { + outcome: "not_scheduled", + reason: "Scheduled max-turn continuation suppressed because the target issue no longer exists", + errorCode: "issue_not_found", + issueId, + details: { issueId }, + }; + } + + if (lockedIssue.assigneeAgentId !== run.agentId) { + return { + outcome: "not_scheduled", + reason: "Scheduled max-turn continuation suppressed because issue ownership changed", + errorCode: "issue_reassigned", + issueId, + details: { + issueId, + previousAssigneeAgentId: run.agentId, + currentAssigneeAgentId: lockedIssue.assigneeAgentId, + }, + }; + } + + if (lockedIssue.status === "cancelled" || lockedIssue.status === "done") { + return { + outcome: "not_scheduled", + reason: `Scheduled max-turn continuation suppressed because issue reached terminal status (${lockedIssue.status})`, + errorCode: lockedIssue.status === "cancelled" ? "issue_cancelled" : "issue_terminal_status", + issueId, + details: { issueId, currentStatus: lockedIssue.status }, + }; + } + + if (lockedIssue.status !== "in_progress") { + return { + outcome: "not_scheduled", + reason: `Scheduled max-turn continuation suppressed because issue is no longer in_progress (current status: ${lockedIssue.status})`, + errorCode: "issue_not_in_progress", + issueId, + details: { issueId, currentStatus: lockedIssue.status, requiredStatus: "in_progress" }, + }; + } + + if (lockedIssue.executionRunId !== run.id) { + return { + outcome: "not_scheduled", + reason: + "Scheduled max-turn continuation suppressed because the issue execution lock belongs to a different run", + errorCode: "issue_execution_lock_changed", + issueId, + details: { + issueId, + expectedExecutionRunId: run.id, + currentExecutionRunId: lockedIssue.executionRunId, + }, + }; + } + } + } - const retryRun = await db.transaction(async (tx) => { const wakeupRequest = await tx .insert(agentWakeupRequests) .values({ @@ -4379,6 +4859,7 @@ export function heartbeatService(db: Db, options: HeartbeatServiceOptions = {}) status: "queued", requestedByActorType: "system", requestedByActorId: null, + idempotencyKey: maxTurnContinuationIdempotencyKey, updatedAt: now, }) .returning() @@ -4425,9 +4906,62 @@ export function heartbeatService(db: Db, options: HeartbeatServiceOptions = {}) .where(and(eq(issues.id, issueId), eq(issues.companyId, run.companyId), eq(issues.executionRunId, run.id))); } - return scheduledRun; + return { + outcome: "scheduled", + run: scheduledRun, + reusedExisting: false, + }; }); + if (scheduleResult.outcome === "not_scheduled") { + await appendRunEvent(run, await nextRunEventSeq(run.id), { + eventType: "lifecycle", + stream: "system", + level: "warn", + message: scheduleResult.reason, + payload: { + retryReason, + scheduledRetryAttempt: nextAttempt, + maxAttempts, + ...scheduleResult.details, + }, + }); + return { + outcome: "not_scheduled" as const, + reason: scheduleResult.reason, + errorCode: scheduleResult.errorCode, + issueId: scheduleResult.issueId, + }; + } + + const retryRun = scheduleResult.run; + const dueAt = retryRun.scheduledRetryAt ? new Date(retryRun.scheduledRetryAt) : schedule.dueAt; + + if (scheduleResult.reusedExisting) { + await appendRunEvent(run, await nextRunEventSeq(run.id), { + eventType: "lifecycle", + stream: "system", + level: "info", + message: `Reused existing max-turn continuation ${retryRun.scheduledRetryAttempt}/${schedule.maxAttempts}`, + payload: { + retryRunId: retryRun.id, + retryReason, + idempotencyKey: maxTurnContinuationIdempotencyKey, + scheduledRetryAttempt: retryRun.scheduledRetryAttempt, + scheduledRetryAt: dueAt.toISOString(), + }, + }); + + return { + outcome: "scheduled" as const, + run: retryRun, + dueAt, + attempt: retryRun.scheduledRetryAttempt, + maxAttempts: schedule.maxAttempts, + reusedExisting: true, + }; + } + await appendRunEvent(run, await nextRunEventSeq(run.id), { eventType: "lifecycle", stream: "system", @@ -4449,7 +4983,7 @@ export function heartbeatService(db: Db, options: HeartbeatServiceOptions = {}) return { outcome: "scheduled" as const, run: retryRun, - dueAt: schedule.dueAt, + dueAt, attempt: schedule.attempt, maxAttempts: schedule.maxAttempts, }; @@ -4471,86 +5005,35 @@ export function heartbeatService(db: Db, options: HeartbeatServiceOptions = {}) const promotedRunIds: string[] = []; for (const dueRun of dueRuns) { - const dueRunIssueId = readNonEmptyString(parseObject(dueRun.contextSnapshot).issueId); - if (dueRunIssueId) { - const issue = await db - .select({ - id: issues.id, - status: issues.status, - assigneeAgentId: issues.assigneeAgentId, - executionRunId: issues.executionRunId, - }) - .from(issues) - .where(and(eq(issues.id, dueRunIssueId), eq(issues.companyId, dueRun.companyId))) - .then((rows) => rows[0] ?? null); + const agent = await getAgent(dueRun.agentId); + if (!agent) { + await cancelScheduledRetryForGate(dueRun, { + allowed: false, + reason: "Scheduled retry suppressed because the agent no longer exists", + errorCode: "agent_not_invokable", + issueId: readNonEmptyString(parseObject(dueRun.contextSnapshot).issueId), + details: { agentId: dueRun.agentId }, + }, now); + continue; + } - if (issue && (issue.assigneeAgentId !== dueRun.agentId || issue.status === "cancelled")) { - const issueCancelled = issue.status === "cancelled"; - const reason = issueCancelled - ? "Cancelled because the issue was cancelled before the scheduled retry became due" - : "Cancelled because the issue was reassigned before the scheduled retry became due"; - const cancelled = await db - .update(heartbeatRuns) - .set({ - status: "cancelled", - finishedAt: now, - error: reason, - errorCode: issueCancelled ? "issue_cancelled" : "issue_reassigned", - updatedAt: now, - }) - .where( - and( - eq(heartbeatRuns.id, dueRun.id), - eq(heartbeatRuns.status, "scheduled_retry"), - lte(heartbeatRuns.scheduledRetryAt, now), - ), - ) - .returning() - .then((rows) => rows[0] ?? null); - - if (!cancelled) continue; - - if (cancelled.wakeupRequestId) { - await db - .update(agentWakeupRequests) - .set({ - status: "cancelled", - finishedAt: now, - error: reason, - updatedAt: now, - }) - .where(eq(agentWakeupRequests.id, cancelled.wakeupRequestId)); - } - - if (issue.executionRunId === cancelled.id) { - await db - .update(issues) - .set({ - executionRunId: null, - executionAgentNameKey: null, - executionLockedAt: null, - updatedAt: now, - }) - .where(and(eq(issues.id, issue.id), eq(issues.executionRunId, cancelled.id))); - } - - await appendRunEvent(cancelled, await nextRunEventSeq(cancelled.id), { - eventType: "lifecycle", - stream: "system", - level: "warn", - message: issueCancelled - ? "Scheduled retry cancelled because issue was cancelled before it became due" - : "Scheduled retry cancelled because issue ownership changed before it became due", - payload: { - issueId: issue.id, - issueStatus: issue.status, - scheduledRetryAttempt: cancelled.scheduledRetryAttempt, - scheduledRetryAt: cancelled.scheduledRetryAt ? new Date(cancelled.scheduledRetryAt).toISOString() : null, - scheduledRetryReason: cancelled.scheduledRetryReason, - previousRetryAgentId: cancelled.agentId, - currentAssigneeAgentId: issue.assigneeAgentId, - }, - }); + const contextSnapshot = parseObject(dueRun.contextSnapshot); + const gate = await evaluateScheduledRetryGate({ + run: dueRun, + agent, + contextSnapshot, + retryReason: dueRun.scheduledRetryReason, + enforceIssueExecutionLock: dueRun.scheduledRetryReason === MAX_TURN_CONTINUATION_RETRY_REASON, + }); + if (!gate.allowed) { + if ( + gate.errorCode === "issue_not_found" && + dueRun.scheduledRetryReason !== MAX_TURN_CONTINUATION_RETRY_REASON + ) { + // Preserve legacy transient retry behavior for runs that only carry a + // loose task context rather than a persisted issue row. + } else { + await cancelScheduledRetryForGate(dueRun, gate, now); continue; } } @@ -4617,6 +5100,20 @@ export function heartbeatService(db: Db, options: HeartbeatServiceOptions = {}) }; } + function parseMaxTurnContinuationPolicy(agent: typeof agents.$inferSelect): MaxTurnContinuationPolicy { + const runtimeConfig = parseObject(agent.runtimeConfig); + const heartbeat = parseObject(runtimeConfig.heartbeat); + const configured = parseObject(heartbeat.maxTurnContinuation); + const rawMaxAttempts = Math.floor(asNumber(configured.maxAttempts, MAX_TURN_CONTINUATION_DEFAULT_MAX_ATTEMPTS)); + const rawDelayMs = Math.floor(asNumber(configured.delayMs, MAX_TURN_CONTINUATION_DEFAULT_DELAY_MS)); + + return { + enabled: asBoolean(configured.enabled, true), + maxAttempts: Math.max(0, Math.min(MAX_TURN_CONTINUATION_MAX_ATTEMPTS_CAP, rawMaxAttempts)), + delayMs: Math.max(0, Math.min(MAX_TURN_CONTINUATION_MAX_DELAY_MS, rawDelayMs)), + }; + } + function issueRunPriorityRank(priority: string | null | undefined) { switch (priority) { case "critical": @@ -4857,6 +5354,8 @@ export function heartbeatService(db: Db, options: HeartbeatServiceOptions = {}) | "issue_not_found" | "issue_assignee_changed" | "issue_terminal_status" + | "issue_not_in_progress" + | "issue_execution_lock_changed" | "issue_review_participant_changed"; details: Record; }; @@ -4871,6 +5370,7 @@ export function heartbeatService(db: Db, options: HeartbeatServiceOptions = {}) id: issues.id, status: issues.status, assigneeAgentId: issues.assigneeAgentId, + executionRunId: issues.executionRunId, executionState: issues.executionState, }) .from(issues) @@ -4889,6 +5389,7 @@ export function heartbeatService(db: Db, options: HeartbeatServiceOptions = {}) const wakeCommentId = deriveCommentId(context, null); const isInteractionWake = allowsIssueInteractionWake(context); const resumeIntent = context.resumeIntent === true || context.followUpRequested === true; + const retryReason = readNonEmptyString(context.retryReason) ?? run.scheduledRetryReason ?? null; if (issue.assigneeAgentId !== run.agentId && !isInteractionWake) { return { @@ -4915,6 +5416,29 @@ export function heartbeatService(db: Db, options: HeartbeatServiceOptions = {}) } } + if (retryReason === MAX_TURN_CONTINUATION_RETRY_REASON && issue.status !== "in_progress") { + return { + stale: true, + errorCode: "issue_not_in_progress", + reason: `Cancelled because max-turn continuation issue is no longer in_progress (current status: ${issue.status}) before the queued run could start`, + details: { issueId, currentStatus: issue.status, requiredStatus: "in_progress" }, + }; + } + + if (retryReason === MAX_TURN_CONTINUATION_RETRY_REASON && issue.executionRunId !== run.id) { + return { + stale: true, + errorCode: "issue_execution_lock_changed", + reason: + "Cancelled because max-turn continuation no longer owns the issue execution lock before the queued run could start", + details: { + issueId, + expectedExecutionRunId: run.id, + currentExecutionRunId: issue.executionRunId, + }, + }; + } + if (issue.status === "in_review") { const executionState = parseIssueExecutionState(issue.executionState); const currentParticipant = executionState?.currentParticipant ?? null; @@ -6683,7 +7207,28 @@ export function heartbeatService(db: Db, options: HeartbeatServiceOptions = {}) ); } } - if (outcome === "failed" && readTransientRecoveryContractFromRun(livenessRun)) { + if (outcome === "failed" && isMaxTurnExhaustionRun(livenessRun)) { + const policy = parseMaxTurnContinuationPolicy(agent); + if (policy.enabled && policy.maxAttempts > 0) { + await scheduleBoundedRetryForRun(livenessRun, agent, { + retryReason: MAX_TURN_CONTINUATION_RETRY_REASON, + wakeReason: MAX_TURN_CONTINUATION_WAKE_REASON, + maxAttempts: policy.maxAttempts, + delayMs: policy.delayMs, + }); + } else { + await appendRunEvent(livenessRun, await nextRunEventSeq(livenessRun.id), { + eventType: "lifecycle", + stream: "system", + level: "warn", + message: "Max-turn continuation suppressed because the policy is disabled", + payload: { + retryReason: MAX_TURN_CONTINUATION_RETRY_REASON, + policy, + }, + }); + } + } else if (outcome === "failed" && readTransientRecoveryContractFromRun(livenessRun)) { await scheduleBoundedRetryForRun(livenessRun, agent); } await finalizeIssueCommentPolicy(livenessRun, agent); @@ -8443,6 +8988,8 @@ export function heartbeatService(db: Db, options: HeartbeatServiceOptions = {}) random?: () => number; retryReason?: string; wakeReason?: string; + maxAttempts?: number; + delayMs?: number; }, ) => { const run = await getRun(runId, { unsafeFullResultJson: true }); diff --git a/ui/src/components/AgentConfigForm.tsx b/ui/src/components/AgentConfigForm.tsx index c36463ab..5e232665 100644 --- a/ui/src/components/AgentConfigForm.tsx +++ b/ui/src/components/AgentConfigForm.tsx @@ -27,7 +27,7 @@ import { } from "@/components/ui/popover"; import { Button } from "@/components/ui/button"; import { FolderOpen, Heart, ChevronDown, X } from "lucide-react"; -import { cn } from "../lib/utils"; +import { asBoolean, asFiniteNumber, asObject, cn } from "../lib/utils"; import { extractModelName, extractProviderId } from "../lib/model-utils"; import { queryKeys } from "../lib/queryKeys"; import { useCompany } from "../context/CompanyContext"; @@ -175,6 +175,19 @@ const claudeThinkingEffortOptions = [ { id: "high", label: "High" }, ] as const; +const MAX_TURN_CONTINUATION_DEFAULT_MAX_ATTEMPTS = 2; +const MAX_TURN_CONTINUATION_MAX_ATTEMPTS_CAP = 10; +const MAX_TURN_CONTINUATION_DEFAULT_DELAY_SEC = 1; +const MAX_TURN_CONTINUATION_MAX_DELAY_SEC = 300; + +function clampInteger(value: number, min: number, max: number) { + return Math.max(min, Math.min(max, Math.floor(value))); +} + +function clampDelayMsFromSeconds(value: number) { + return clampInteger(value, 0, MAX_TURN_CONTINUATION_MAX_DELAY_SEC) * 1000; +} + /* ---- Form ---- */ @@ -628,6 +641,27 @@ export function AgentConfigForm(props: AgentConfigFormProps) { const currentDefaultEnvironmentId = isCreate ? val!.defaultEnvironmentId ?? "" : eff("identity", "defaultEnvironmentId", props.agent.defaultEnvironmentId ?? ""); + const effectiveHeartbeat = asObject(effectiveRuntimeConfig.heartbeat); + const maxTurnContinuation = asObject(effectiveHeartbeat.maxTurnContinuation); + const maxTurnContinuationEnabled = asBoolean(maxTurnContinuation.enabled, true); + const maxTurnContinuationMaxAttempts = clampInteger( + asFiniteNumber(maxTurnContinuation.maxAttempts, MAX_TURN_CONTINUATION_DEFAULT_MAX_ATTEMPTS), + 0, + MAX_TURN_CONTINUATION_MAX_ATTEMPTS_CAP, + ); + const maxTurnContinuationDelaySec = clampInteger( + asFiniteNumber(maxTurnContinuation.delayMs, MAX_TURN_CONTINUATION_DEFAULT_DELAY_SEC * 1000) / 1000, + 0, + MAX_TURN_CONTINUATION_MAX_DELAY_SEC, + ); + + function updateMaxTurnContinuation(patch: Record) { + mark("heartbeat", "maxTurnContinuation", { + ...maxTurnContinuation, + ...patch, + }); + } + return (
{/* ---- Floating Save button (edit mode, when dirty) ---- */} @@ -1182,6 +1216,40 @@ export function AgentConfigForm(props: AgentConfigFormProps) { className={inputClass} /> +
+ updateMaxTurnContinuation({ enabled: v })} + /> + {maxTurnContinuationEnabled ? ( +
+ + + updateMaxTurnContinuation({ + maxAttempts: clampInteger(v, 0, MAX_TURN_CONTINUATION_MAX_ATTEMPTS_CAP), + })} + immediate + className={inputClass} + /> + + + + updateMaxTurnContinuation({ + delayMs: clampDelayMsFromSeconds(v), + })} + immediate + className={inputClass} + /> + +
+ ) : null} +
diff --git a/ui/src/components/IssueRunLedger.test.tsx b/ui/src/components/IssueRunLedger.test.tsx index e3baabc4..9913971a 100644 --- a/ui/src/components/IssueRunLedger.test.tsx +++ b/ui/src/components/IssueRunLedger.test.tsx @@ -317,6 +317,44 @@ describe("IssueRunLedger", () => { expect(container.textContent).toContain("Manual intervention required"); }); + it("labels max-turn stops and continuation retries without confusing them with per-run turns", () => { + renderLedger({ + runs: [ + createRun({ + runId: "run-scheduled-continuation", + status: "scheduled_retry", + finishedAt: null, + livenessState: null, + livenessReason: null, + retryOfRunId: "run-max-turns", + scheduledRetryAt: "2026-04-18T20:15:00.000Z", + scheduledRetryAttempt: 1, + scheduledRetryReason: "max_turns_continuation", + }), + createRun({ + runId: "run-max-turns", + resultJson: { stopReason: "max_turns_exhausted" }, + createdAt: "2026-04-18T19:57:00.000Z", + }), + createRun({ + runId: "run-continuation-exhausted", + status: "failed", + createdAt: "2026-04-18T19:56:00.000Z", + retryOfRunId: "run-max-turns", + scheduledRetryAttempt: 3, + scheduledRetryReason: "max_turns_continuation", + retryExhaustedReason: "Bounded retry exhausted after 3 scheduled attempts; no further automatic retry will be queued", + }), + ], + }); + + expect(container.textContent).toContain("Continuation scheduled"); + expect(container.textContent).toContain("Max-turn continuation"); + expect(container.textContent).toContain("Next continuation"); + expect(container.textContent).toContain("Stop max turns exhausted"); + expect(container.textContent).toContain("Continuation exhausted"); + }); + it("shows timeout, cancel, and budget stop reasons without raw logs", () => { renderLedger({ runs: [ diff --git a/ui/src/components/IssueRunLedger.tsx b/ui/src/components/IssueRunLedger.tsx index e45bd4ae..7ae52e1a 100644 --- a/ui/src/components/IssueRunLedger.tsx +++ b/ui/src/components/IssueRunLedger.tsx @@ -311,6 +311,7 @@ function stopReasonLabel(run: RunForIssue) { if (timeoutFired || stopReason === "timeout") { return timeoutText ? `timeout (${timeoutText})` : "timeout"; } + if (stopReason === "max_turns_exhausted" || stopReason === "turn_limit_exhausted") return "max turns exhausted"; if (stopReason === "budget_paused") return "budget paused"; if (stopReason === "cancelled") return "cancelled"; if (stopReason === "paused") return "paused by board"; diff --git a/ui/src/components/agent-config-primitives.tsx b/ui/src/components/agent-config-primitives.tsx index d5c0f4ca..6cfa9985 100644 --- a/ui/src/components/agent-config-primitives.tsx +++ b/ui/src/components/agent-config-primitives.tsx @@ -56,6 +56,9 @@ export const help: Record = { wakeOnDemand: "Allow this agent to be woken by assignments, API calls, UI actions, or automated systems.", cooldownSec: "Minimum seconds between consecutive heartbeat runs.", maxConcurrentRuns: "Maximum number of heartbeat runs that can execute simultaneously for this agent.", + maxTurnContinuationEnabled: "Automatically queue bounded continuation runs when an adapter stops because its per-run turn cap was exhausted.", + maxTurnContinuationMaxAttempts: "Maximum automatic continuations after one max-turn stop. This is separate from max turns per run.", + maxTurnContinuationDelaySec: "Seconds to wait before starting each max-turn continuation.", budgetMonthlyCents: "Monthly spending limit in cents. 0 means no limit.", }; diff --git a/ui/src/lib/agent-config-patch.test.ts b/ui/src/lib/agent-config-patch.test.ts index caa6a49f..7cdc79e2 100644 --- a/ui/src/lib/agent-config-patch.test.ts +++ b/ui/src/lib/agent-config-patch.test.ts @@ -108,6 +108,35 @@ describe("buildAgentUpdatePatch", () => { expect(patch.adapterConfig).toBeUndefined(); }); + it("writes max-turn continuation policy under runtimeConfig.heartbeat", () => { + const patch = buildAgentUpdatePatch( + makeAgent(), + makeOverlay({ + heartbeat: { + maxTurnContinuation: { + enabled: true, + maxAttempts: 3, + delayMs: 1000, + }, + }, + }), + ); + + expect(patch).toEqual({ + runtimeConfig: { + heartbeat: { + enabled: true, + intervalSec: 300, + maxTurnContinuation: { + enabled: true, + maxAttempts: 3, + delayMs: 1000, + }, + }, + }, + }); + }); + it("merges cheap profile changes onto existing runtimeConfig.modelProfiles state", () => { const agent = makeAgent(); agent.runtimeConfig = { diff --git a/ui/src/lib/runRetryState.test.ts b/ui/src/lib/runRetryState.test.ts index cc6ae84c..9549cc30 100644 --- a/ui/src/lib/runRetryState.test.ts +++ b/ui/src/lib/runRetryState.test.ts @@ -5,6 +5,7 @@ describe("runRetryState", () => { it("formats internal retry reasons for operators", () => { expect(formatRetryReason("transient_failure")).toBe("Transient failure"); expect(formatRetryReason("issue_continuation_needed")).toBe("Continuation needed"); + expect(formatRetryReason("max_turns_continuation")).toBe("Max-turn continuation"); expect(formatRetryReason("custom_reason")).toBe("custom reason"); }); @@ -24,6 +25,22 @@ describe("runRetryState", () => { }); }); + it("describes max-turn continuation retries distinctly", () => { + expect( + describeRunRetryState({ + status: "scheduled_retry", + retryOfRunId: "run-max-turns", + scheduledRetryAttempt: 1, + scheduledRetryReason: "max_turns_continuation", + scheduledRetryAt: "2026-04-18T20:15:00.000Z", + }), + ).toMatchObject({ + kind: "scheduled", + badgeLabel: "Continuation scheduled", + detail: "Attempt 1 · Max-turn continuation", + }); + }); + it("describes exhausted retries", () => { expect( describeRunRetryState({ diff --git a/ui/src/lib/runRetryState.ts b/ui/src/lib/runRetryState.ts index 1df9c788..0b5d356b 100644 --- a/ui/src/lib/runRetryState.ts +++ b/ui/src/lib/runRetryState.ts @@ -24,6 +24,7 @@ const RETRY_REASON_LABELS: Record = { process_lost: "Process lost", assignment_recovery: "Assignment recovery", issue_continuation_needed: "Continuation needed", + max_turns_continuation: "Max-turn continuation", }; function readNonEmptyString(value: unknown) { @@ -51,6 +52,7 @@ export function describeRunRetryState(run: RetryAwareRun): RunRetryStateSummary const retryOfRunId = readNonEmptyString(run.retryOfRunId); const exhaustedReason = readNonEmptyString(run.retryExhaustedReason); const dueAt = run.scheduledRetryAt ? formatDateTime(run.scheduledRetryAt) : null; + const isMaxTurnContinuation = run.scheduledRetryReason === "max_turns_continuation"; const hasRetryMetadata = Boolean(retryOfRunId) || Boolean(reasonLabel) @@ -63,10 +65,12 @@ export function describeRunRetryState(run: RetryAwareRun): RunRetryStateSummary if (run.status === "scheduled_retry") { return { kind: "scheduled", - badgeLabel: "Retry scheduled", + badgeLabel: isMaxTurnContinuation ? "Continuation scheduled" : "Retry scheduled", tone: "border-cyan-500/30 bg-cyan-500/10 text-cyan-700 dark:text-cyan-300", detail: joinFragments([attemptLabel, reasonLabel]), - secondary: dueAt ? `Next retry ${dueAt}` : "Next retry pending schedule", + secondary: dueAt + ? `${isMaxTurnContinuation ? "Next continuation" : "Next retry"} ${dueAt}` + : `${isMaxTurnContinuation ? "Next continuation" : "Next retry"} pending schedule`, retryOfRunId, }; } @@ -74,7 +78,7 @@ export function describeRunRetryState(run: RetryAwareRun): RunRetryStateSummary if (exhaustedReason) { return { kind: "exhausted", - badgeLabel: "Retry exhausted", + badgeLabel: isMaxTurnContinuation ? "Continuation exhausted" : "Retry exhausted", tone: "border-amber-500/30 bg-amber-500/10 text-amber-700 dark:text-amber-300", detail: joinFragments([attemptLabel, reasonLabel, "Automatic retries exhausted"]), secondary: exhaustedReason.includes("Manual intervention required") @@ -86,7 +90,7 @@ export function describeRunRetryState(run: RetryAwareRun): RunRetryStateSummary return { kind: "attempted", - badgeLabel: "Retried run", + badgeLabel: isMaxTurnContinuation ? "Continued run" : "Retried run", tone: "border-slate-500/20 bg-slate-500/10 text-slate-700 dark:text-slate-300", detail: joinFragments([attemptLabel, reasonLabel]), secondary: null, diff --git a/ui/src/lib/utils.ts b/ui/src/lib/utils.ts index 65a2e42c..ee47b146 100644 --- a/ui/src/lib/utils.ts +++ b/ui/src/lib/utils.ts @@ -7,6 +7,20 @@ export function cn(...inputs: ClassValue[]) { return twMerge(clsx(inputs)); } +export function asObject(value: unknown): Record { + return typeof value === "object" && value !== null && !Array.isArray(value) + ? value as Record + : {}; +} + +export function asBoolean(value: unknown, fallback: boolean) { + return typeof value === "boolean" ? value : fallback; +} + +export function asFiniteNumber(value: unknown, fallback: number) { + return typeof value === "number" && Number.isFinite(value) ? value : fallback; +} + export function formatCents(cents: number): string { return `$${(cents / 100).toLocaleString("en-US", { minimumFractionDigits: 2, maximumFractionDigits: 2 })}`; }