[codex] Retry max-turn exhausted heartbeats (#5096)

## Thinking Path

> - Paperclip orchestrates AI agents for autonomous companies, and
heartbeat execution is the control-plane loop that keeps assigned work
moving.
> - Max-turn exhaustion is a recoverable local-adapter stop condition
for Claude and Gemini agents when a run needs another heartbeat to
continue safely.
> - The previous behavior could leave max-turn continuation details hard
to inspect, and duplicate/stale continuation wakes could keep running
after issue state changed.
> - The adapter layer also needed to avoid trusting arbitrary
stdout/stderr text as scheduler control metadata.
> - This pull request adds bounded max-turn continuation scheduling,
visible retry state, structured stop metadata handling, and
stale/duplicate continuation guards.
> - The benefit is safer automatic continuation after max-turn stops,
clearer operator visibility, and fewer duplicate or stale agent runs.

## What Changed

- Replaces closed PR #4952, whose head repository was deleted.
- Rebases the recovered max-turn continuation branch onto current
`paperclipai/paperclip:master`.
- Adds max-turn continuation scheduling and retry-state plumbing for
heartbeat runs.
- Adds stale/duplicate continuation suppression when issue status,
ownership, or execution locks change.
- Normalizes Claude/Gemini max-turn detection around structured stop
metadata instead of unstructured stdout/stderr text.
- Surfaces max-turn continuation settings and retry visibility in the
board UI.
- Adds focused server, adapter, and UI tests for max-turn stop metadata,
retry scheduling, stale queued-run invalidation, adapter
parsing/execution, run ledger display, and agent config patching.

## Verification

- `pnpm install --no-frozen-lockfile` to refresh local dependencies
after rebasing onto current `master`.
- `pnpm run preflight:workspace-links && pnpm exec vitest run
server/src/__tests__/claude-local-adapter.test.ts
server/src/__tests__/claude-local-execute.test.ts
server/src/__tests__/gemini-local-adapter.test.ts
server/src/__tests__/gemini-local-execute.test.ts
server/src/__tests__/heartbeat-retry-scheduling.test.ts
server/src/__tests__/heartbeat-stale-queue-invalidation.test.ts
server/src/services/heartbeat-stop-metadata.test.ts
ui/src/components/IssueRunLedger.test.tsx
ui/src/lib/agent-config-patch.test.ts ui/src/lib/runRetryState.test.ts
--testTimeout=20000`
- `pnpm --filter @paperclipai/adapter-claude-local typecheck && pnpm
--filter @paperclipai/adapter-gemini-local typecheck && pnpm --filter
@paperclipai/server typecheck && pnpm --filter @paperclipai/ui
typecheck`
- UI screenshot note: the UI changes are limited to config/ledger state
rendering rather than layout changes; component/unit coverage above
verifies the rendered behavior.

## Risks

- Medium behavior risk: heartbeat retry gating now suppresses max-turn
continuations when issue state or execution locks drift, so any callers
that relied on stale continuations running will now see cancellation
instead.
- Low adapter risk: Claude/Gemini unstructured text no longer triggers
max-turn scheduler metadata, so only structured stop signals and Gemini
exit code 53 are trusted.
- No database migrations.

> For core feature work, check [`ROADMAP.md`](ROADMAP.md) first and
discuss it in `#dev` before opening the PR. Feature PRs that overlap
with planned core work may need to be redirected — check the roadmap
first. See `CONTRIBUTING.md`.

## Model Used

- OpenAI Codex coding agent, GPT-5-class model, tool-enabled local
repository editing and command execution.

## Checklist

- [x] I have included a thinking path that traces from project context
to this change
- [x] I have specified the model used (with version and capability
details)
- [x] I have checked ROADMAP.md and confirmed this PR does not duplicate
planned core work
- [x] I have run tests locally and they pass
- [x] I have added or updated tests where applicable
- [x] If this change affects the UI, I have included before/after
screenshots (not applicable: state/default rendering only; covered by
component/unit tests)
- [x] I have updated relevant documentation to reflect my changes (not
applicable: no user-facing command or docs contract changed)
- [x] I have considered and documented any risks above
- [x] I will address all Greptile and reviewer comments before
requesting merge

---------

Co-authored-by: Paperclip <noreply@paperclip.ing>
This commit is contained in:
Dotta 2026-05-03 11:30:48 -05:00 committed by GitHub
parent 57229d0f24
commit 15eac43b43
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
21 changed files with 1915 additions and 120 deletions

View file

@ -21,6 +21,15 @@ describe("claude_local max-turn detection", () => {
).toBe(true);
});
it("checks every structured stop field for max-turn exhaustion", () => {
expect(
isClaudeMaxTurnsResult({
stop_reason: "end_turn",
stopReason: "max_turns_exhausted",
}),
).toBe(true);
});
it("returns false for non-max-turn results", () => {
expect(
isClaudeMaxTurnsResult({
@ -29,6 +38,15 @@ describe("claude_local max-turn detection", () => {
}),
).toBe(false);
});
it("does not detect max-turn exhaustion from unstructured result text", () => {
expect(
isClaudeMaxTurnsResult({
subtype: "error",
result: "Tool output said: Maximum turns reached.",
}),
).toBe(false);
});
});
describe("claude_local ui stdout parser", () => {

View file

@ -19,6 +19,24 @@ process.exit(${exit});
await fs.chmod(commandPath, 0o755);
}
async function writeTextFailingClaudeCommand(
commandPath: string,
options: { stdout?: string; stderr?: string; exitCode?: number },
): Promise<void> {
const exit = options.exitCode ?? 1;
const script = `#!/usr/bin/env node
if (${JSON.stringify(options.stdout ?? "")}) {
process.stdout.write(${JSON.stringify(options.stdout ?? "")});
}
if (${JSON.stringify(options.stderr ?? "")}) {
process.stderr.write(${JSON.stringify(options.stderr ?? "")});
}
process.exit(${exit});
`;
await fs.writeFile(commandPath, script, "utf8");
await fs.chmod(commandPath, 0o755);
}
async function writeFakeClaudeCommand(commandPath: string): Promise<void> {
const script = `#!/usr/bin/env node
const fs = require("node:fs");
@ -372,6 +390,119 @@ describe("claude execute", () => {
}
});
it("normalizes max-turn exhaustion into scheduler stop metadata", async () => {
const root = await fs.mkdtemp(path.join(os.tmpdir(), "paperclip-claude-exec-max-turns-"));
const resultEvent = {
type: "result",
subtype: "error_max_turns",
session_id: "claude-session-1",
is_error: true,
result: "Maximum turns reached.",
usage: { input_tokens: 1, cache_read_input_tokens: 0, output_tokens: 1 },
};
const { workspace, commandPath, restore } = await setupExecuteEnv(root, {
commandWriter: (commandPath) => writeFailingClaudeCommand(commandPath, { resultEvent }),
});
try {
const result = await execute({
runId: "run-max-turns",
agent: { id: "agent-1", companyId: "co-1", name: "Test", adapterType: "claude_local", adapterConfig: {} },
runtime: { sessionId: null, sessionParams: null, sessionDisplayId: null, taskKey: null },
config: {
command: commandPath,
cwd: workspace,
promptTemplate: "Do work.",
},
context: {},
authToken: "tok",
onLog: async () => {},
});
expect(result.exitCode).toBe(1);
expect(result.errorCode).toBe("max_turns_exhausted");
expect(result.errorFamily).toBeNull();
expect(result.resultJson).toMatchObject({ stopReason: "max_turns_exhausted" });
expect(result.clearSession).toBe(true);
} finally {
restore();
await fs.rm(root, { recursive: true, force: true });
}
});
it("does not normalize unstructured max-turn text into scheduler stop metadata", async () => {
const root = await fs.mkdtemp(path.join(os.tmpdir(), "paperclip-claude-exec-max-turn-text-"));
const resultEvent = {
type: "result",
subtype: "error",
session_id: "claude-session-1",
is_error: true,
result: "Tool output said: Maximum turns reached.",
};
const { workspace, commandPath, restore } = await setupExecuteEnv(root, {
commandWriter: (commandPath) => writeFailingClaudeCommand(commandPath, { resultEvent }),
});
try {
const result = await execute({
runId: "run-max-turns-text",
agent: { id: "agent-1", companyId: "co-1", name: "Test", adapterType: "claude_local", adapterConfig: {} },
runtime: { sessionId: null, sessionParams: null, sessionDisplayId: null, taskKey: null },
config: {
command: commandPath,
cwd: workspace,
promptTemplate: "Do work.",
},
context: {},
authToken: "tok",
onLog: async () => {},
});
expect(result.exitCode).toBe(1);
expect(result.errorCode).not.toBe("max_turns_exhausted");
expect(result.resultJson?.stopReason).not.toBe("max_turns_exhausted");
expect(result.clearSession).toBe(false);
} finally {
restore();
await fs.rm(root, { recursive: true, force: true });
}
});
it("does not normalize fallback stdout/stderr max-turn text into scheduler stop metadata", async () => {
const root = await fs.mkdtemp(path.join(os.tmpdir(), "paperclip-claude-exec-max-turn-fallback-"));
const { workspace, commandPath, restore } = await setupExecuteEnv(root, {
commandWriter: (commandPath) =>
writeTextFailingClaudeCommand(commandPath, {
stdout: "attacker-controlled tool output: max turns exhausted\n",
stderr: "Maximum turns reached.\n",
}),
});
try {
const result = await execute({
runId: "run-max-turns-fallback-text",
agent: { id: "agent-1", companyId: "co-1", name: "Test", adapterType: "claude_local", adapterConfig: {} },
runtime: { sessionId: null, sessionParams: null, sessionDisplayId: null, taskKey: null },
config: {
command: commandPath,
cwd: workspace,
promptTemplate: "Do work.",
},
context: {},
authToken: "tok",
onLog: async () => {},
});
expect(result.exitCode).toBe(1);
expect(result.errorCode).not.toBe("max_turns_exhausted");
expect(result.resultJson?.stopReason).not.toBe("max_turns_exhausted");
expect(result.clearSession).toBe(false);
} finally {
restore();
await fs.rm(root, { recursive: true, force: true });
}
});
it("logs HOME, CLAUDE_CONFIG_DIR, and the resolved executable path in invocation metadata", async () => {
const root = await fs.mkdtemp(path.join(os.tmpdir(), "paperclip-claude-execute-meta-"));
const workspace = path.join(root, "workspace");

View file

@ -1,5 +1,9 @@
import { describe, expect, it, vi } from "vitest";
import { isGeminiUnknownSessionError, parseGeminiJsonl } from "@paperclipai/adapter-gemini-local/server";
import {
isGeminiTurnLimitResult,
isGeminiUnknownSessionError,
parseGeminiJsonl,
} from "@paperclipai/adapter-gemini-local/server";
import { parseGeminiStdoutLine } from "@paperclipai/adapter-gemini-local/ui";
import { printGeminiStreamEvent } from "@paperclipai/adapter-gemini-local/cli";
@ -79,6 +83,27 @@ describe("gemini_local stale session detection", () => {
});
});
describe("gemini_local turn-limit detection", () => {
it("detects structured turn-limit signals and exit code 53", () => {
expect(isGeminiTurnLimitResult({ status: "turn_limit" })).toBe(true);
expect(isGeminiTurnLimitResult({ stopReason: "max_turns_exhausted" })).toBe(true);
expect(isGeminiTurnLimitResult(null, 53)).toBe(true);
});
it("checks every structured stop field for turn-limit exhaustion", () => {
expect(
isGeminiTurnLimitResult({
status: "success",
stopReason: "turn_limit_exhausted",
}),
).toBe(true);
});
it("does not detect turn-limit exhaustion from unstructured error text", () => {
expect(isGeminiTurnLimitResult({ error: "max_turns reached" })).toBe(false);
});
});
describe("gemini_local ui stdout parser", () => {
it("parses assistant, thinking, and result events", () => {
const ts = "2026-03-08T00:00:00.000Z";

View file

@ -39,6 +39,35 @@ console.log(JSON.stringify({
await fs.chmod(commandPath, 0o755);
}
async function writeFailingGeminiCommand(
commandPath: string,
options: {
stdoutLines?: Array<Record<string, unknown>>;
stdout?: string;
stderr?: string;
exitCode?: number;
},
): Promise<void> {
const stdoutLines = options.stdoutLines ?? [];
const stdout = options.stdout ?? "";
const stderr = options.stderr ?? "";
const exit = options.exitCode ?? 1;
const script = `#!/usr/bin/env node
for (const line of ${JSON.stringify(stdoutLines.map((line) => JSON.stringify(line)))}) {
console.log(line);
}
if (${JSON.stringify(stdout)}) {
process.stdout.write(${JSON.stringify(stdout)});
}
if (${JSON.stringify(stderr)}) {
console.error(${JSON.stringify(stderr)});
}
process.exit(${exit});
`;
await fs.writeFile(commandPath, script, "utf8");
await fs.chmod(commandPath, 0o755);
}
type CapturePayload = {
argv: string[];
paperclipEnvKeys: string[];
@ -169,6 +198,144 @@ describe("gemini execute", () => {
}
});
it("normalizes turn-limit exhaustion into scheduler stop metadata", async () => {
const root = await fs.mkdtemp(path.join(os.tmpdir(), "paperclip-gemini-max-turns-"));
const workspace = path.join(root, "workspace");
const commandPath = path.join(root, "gemini");
await fs.mkdir(workspace, { recursive: true });
await writeFailingGeminiCommand(commandPath, {
stdoutLines: [
{
type: "result",
subtype: "error",
session_id: "gemini-session-1",
status: "turn_limit",
error: "Turn limit reached.",
},
],
});
const previousHome = process.env.HOME;
process.env.HOME = root;
try {
const result = await execute({
runId: "run-turn-limit",
agent: { id: "a1", companyId: "c1", name: "G", adapterType: "gemini_local", adapterConfig: {} },
runtime: { sessionId: null, sessionParams: null, sessionDisplayId: null, taskKey: null },
config: {
command: commandPath,
cwd: workspace,
},
context: {},
authToken: "t",
onLog: async () => {},
});
expect(result.exitCode).toBe(1);
expect(result.errorCode).toBe("max_turns_exhausted");
expect(result.resultJson).toMatchObject({ stopReason: "max_turns_exhausted" });
expect(result.clearSession).toBe(true);
} finally {
if (previousHome === undefined) {
delete process.env.HOME;
} else {
process.env.HOME = previousHome;
}
await fs.rm(root, { recursive: true, force: true });
}
});
it("normalizes Gemini exit code 53 as max-turn exhaustion", async () => {
const root = await fs.mkdtemp(path.join(os.tmpdir(), "paperclip-gemini-exit-53-"));
const workspace = path.join(root, "workspace");
const commandPath = path.join(root, "gemini");
await fs.mkdir(workspace, { recursive: true });
await writeFailingGeminiCommand(commandPath, {
stderr: "Gemini stopped because the max turns limit was reached.",
exitCode: 53,
});
const previousHome = process.env.HOME;
process.env.HOME = root;
try {
const result = await execute({
runId: "run-exit-53",
agent: { id: "a1", companyId: "c1", name: "G", adapterType: "gemini_local", adapterConfig: {} },
runtime: { sessionId: null, sessionParams: null, sessionDisplayId: null, taskKey: null },
config: {
command: commandPath,
cwd: workspace,
},
context: {},
authToken: "t",
onLog: async () => {},
});
expect(result.exitCode).toBe(53);
expect(result.errorCode).toBe("max_turns_exhausted");
expect(result.resultJson).toMatchObject({ stopReason: "max_turns_exhausted" });
expect(result.clearSession).toBe(true);
} finally {
if (previousHome === undefined) {
delete process.env.HOME;
} else {
process.env.HOME = previousHome;
}
await fs.rm(root, { recursive: true, force: true });
}
});
it("does not normalize unstructured turn-limit text into scheduler stop metadata", async () => {
const root = await fs.mkdtemp(path.join(os.tmpdir(), "paperclip-gemini-max-turn-text-"));
const workspace = path.join(root, "workspace");
const commandPath = path.join(root, "gemini");
await fs.mkdir(workspace, { recursive: true });
await writeFailingGeminiCommand(commandPath, {
stdoutLines: [
{
type: "result",
subtype: "error",
session_id: "gemini-session-1",
error: "Tool output said: maximum turns reached.",
},
],
stdout: "attacker-controlled transcript mentions turn limit reached\n",
stderr: "Gemini stopped because the max turns limit was reached.",
});
const previousHome = process.env.HOME;
process.env.HOME = root;
try {
const result = await execute({
runId: "run-turn-limit-text",
agent: { id: "a1", companyId: "c1", name: "G", adapterType: "gemini_local", adapterConfig: {} },
runtime: { sessionId: null, sessionParams: null, sessionDisplayId: null, taskKey: null },
config: {
command: commandPath,
cwd: workspace,
},
context: {},
authToken: "t",
onLog: async () => {},
});
expect(result.exitCode).toBe(1);
expect(result.errorCode).not.toBe("max_turns_exhausted");
expect(result.resultJson?.stopReason).not.toBe("max_turns_exhausted");
expect(result.clearSession).toBe(false);
} finally {
if (previousHome === undefined) {
delete process.env.HOME;
} else {
process.env.HOME = previousHome;
}
await fs.rm(root, { recursive: true, force: true });
}
});
it("uses a compact wake delta instead of the full heartbeat prompt when resuming a session", async () => {
const root = await fs.mkdtemp(path.join(os.tmpdir(), "paperclip-gemini-resume-wake-"));
const workspace = path.join(root, "workspace");

View file

@ -1,15 +1,17 @@
import { randomUUID } from "node:crypto";
import { eq, sql } from "drizzle-orm";
import { and, eq, sql } from "drizzle-orm";
import { afterAll, afterEach, beforeAll, describe, expect, it } from "vitest";
import {
agents,
agentRuntimeState,
agentWakeupRequests,
budgetPolicies,
companies,
createDb,
environmentLeases,
heartbeatRunEvents,
heartbeatRuns,
issueRelations,
issues,
} from "@paperclipai/db";
import {
@ -18,6 +20,8 @@ import {
} from "./helpers/embedded-postgres.js";
import {
BOUNDED_TRANSIENT_HEARTBEAT_RETRY_DELAYS_MS,
MAX_TURN_CONTINUATION_RETRY_REASON,
MAX_TURN_CONTINUATION_WAKE_REASON,
heartbeatService,
} from "../services/heartbeat.ts";
@ -44,10 +48,12 @@ describeEmbeddedPostgres("heartbeat bounded retry scheduling", () => {
afterEach(async () => {
await db.delete(heartbeatRunEvents);
await db.delete(environmentLeases);
await db.delete(issueRelations);
await db.delete(issues);
await db.delete(heartbeatRuns);
await db.delete(agentWakeupRequests);
await db.delete(agentRuntimeState);
await db.delete(budgetPolicies);
await db.delete(agents);
await db.delete(companies);
});
@ -124,6 +130,92 @@ describeEmbeddedPostgres("heartbeat bounded retry scheduling", () => {
});
}
async function seedMaxTurnFixture(input?: {
companyId?: string;
agentId?: string;
issueId?: string;
runId?: string;
now?: Date;
scheduledRetryAttempt?: number;
runtimeConfig?: Record<string, unknown>;
issueStatus?: string;
}) {
const companyId = input?.companyId ?? randomUUID();
const agentId = input?.agentId ?? randomUUID();
const issueId = input?.issueId ?? randomUUID();
const runId = input?.runId ?? randomUUID();
const now = input?.now ?? new Date("2026-04-20T12:00:00.000Z");
const issuePrefix = `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`;
await db.insert(companies).values({
id: companyId,
name: "Paperclip",
issuePrefix,
requireBoardApprovalForNewAgents: false,
});
await db.insert(agents).values({
id: agentId,
companyId,
name: "ClaudeCoder",
role: "engineer",
status: "active",
adapterType: "claude_local",
adapterConfig: {},
runtimeConfig: input?.runtimeConfig ?? {
heartbeat: {
wakeOnDemand: true,
maxConcurrentRuns: 1,
maxTurnContinuation: {
enabled: true,
maxAttempts: 2,
delayMs: 1_000,
},
},
},
permissions: {},
});
await db.insert(heartbeatRuns).values({
id: runId,
companyId,
agentId,
invocationSource: "assignment",
triggerDetail: "system",
status: "failed",
error: "Maximum turns reached",
errorCode: "adapter_failed",
finishedAt: now,
scheduledRetryAttempt: input?.scheduledRetryAttempt ?? 0,
scheduledRetryReason: input?.scheduledRetryAttempt ? MAX_TURN_CONTINUATION_RETRY_REASON : null,
resultJson: {
stopReason: "max_turns_exhausted",
},
contextSnapshot: {
issueId,
wakeReason: "issue_assigned",
},
updatedAt: now,
createdAt: now,
});
await db.insert(issues).values({
id: issueId,
companyId,
title: "Continue after max turns",
status: input?.issueStatus ?? "in_progress",
priority: "medium",
assigneeAgentId: agentId,
executionRunId: runId,
executionAgentNameKey: "claudecoder",
executionLockedAt: now,
issueNumber: 1,
identifier: `${issuePrefix}-1`,
});
return { companyId, agentId, issueId, runId, now };
}
it("schedules a retry with durable metadata and only promotes it when due", async () => {
const companyId = randomUUID();
const agentId = randomUUID();
@ -218,6 +310,416 @@ describeEmbeddedPostgres("heartbeat bounded retry scheduling", () => {
expect(promotedRun?.status).toBe("queued");
});
it("schedules max-turn continuations with distinct retry metadata", async () => {
const { runId, now } = await seedMaxTurnFixture();
const scheduled = await heartbeat.scheduleBoundedRetry(runId, {
now,
retryReason: MAX_TURN_CONTINUATION_RETRY_REASON,
wakeReason: MAX_TURN_CONTINUATION_WAKE_REASON,
maxAttempts: 2,
delayMs: 1_000,
});
expect(scheduled.outcome).toBe("scheduled");
if (scheduled.outcome !== "scheduled") return;
expect(scheduled.attempt).toBe(1);
expect(scheduled.dueAt.toISOString()).toBe(new Date(now.getTime() + 1_000).toISOString());
const retryRun = await db
.select({
retryOfRunId: heartbeatRuns.retryOfRunId,
status: heartbeatRuns.status,
scheduledRetryAttempt: heartbeatRuns.scheduledRetryAttempt,
scheduledRetryReason: heartbeatRuns.scheduledRetryReason,
contextSnapshot: heartbeatRuns.contextSnapshot,
wakeupRequestId: heartbeatRuns.wakeupRequestId,
})
.from(heartbeatRuns)
.where(eq(heartbeatRuns.id, scheduled.run.id))
.then((rows) => rows[0] ?? null);
expect(retryRun).toMatchObject({
retryOfRunId: runId,
status: "scheduled_retry",
scheduledRetryAttempt: 1,
scheduledRetryReason: MAX_TURN_CONTINUATION_RETRY_REASON,
});
expect((retryRun?.contextSnapshot as Record<string, unknown> | null)?.wakeReason).toBe(
MAX_TURN_CONTINUATION_WAKE_REASON,
);
expect((retryRun?.contextSnapshot as Record<string, unknown> | null)?.codexTransientFallbackMode ?? null).toBeNull();
const wakeupRequest = await db
.select({ reason: agentWakeupRequests.reason, payload: agentWakeupRequests.payload })
.from(agentWakeupRequests)
.where(eq(agentWakeupRequests.id, retryRun?.wakeupRequestId ?? ""))
.then((rows) => rows[0] ?? null);
expect(wakeupRequest?.reason).toBe(MAX_TURN_CONTINUATION_WAKE_REASON);
expect(wakeupRequest?.payload).toMatchObject({
retryOfRunId: runId,
retryReason: MAX_TURN_CONTINUATION_RETRY_REASON,
scheduledRetryAttempt: 1,
});
});
it("coalesces duplicate max-turn continuation schedules for the same source run and attempt", async () => {
const { issueId, runId, now } = await seedMaxTurnFixture();
const retryOptions = {
now,
retryReason: MAX_TURN_CONTINUATION_RETRY_REASON,
wakeReason: MAX_TURN_CONTINUATION_WAKE_REASON,
maxAttempts: 2,
delayMs: 1_000,
};
const [first, second] = await Promise.all([
heartbeat.scheduleBoundedRetry(runId, retryOptions),
heartbeat.scheduleBoundedRetry(runId, retryOptions),
]);
expect(first.outcome).toBe("scheduled");
expect(second.outcome).toBe("scheduled");
if (first.outcome !== "scheduled" || second.outcome !== "scheduled") return;
expect(new Set([first.run.id, second.run.id]).size).toBe(1);
const retryRuns = await db
.select({
id: heartbeatRuns.id,
wakeupRequestId: heartbeatRuns.wakeupRequestId,
})
.from(heartbeatRuns)
.where(
and(
eq(heartbeatRuns.retryOfRunId, runId),
eq(heartbeatRuns.scheduledRetryReason, MAX_TURN_CONTINUATION_RETRY_REASON),
eq(heartbeatRuns.scheduledRetryAttempt, 1),
),
);
expect(retryRuns).toHaveLength(1);
const wakeups = await db
.select({
id: agentWakeupRequests.id,
coalescedCount: agentWakeupRequests.coalescedCount,
idempotencyKey: agentWakeupRequests.idempotencyKey,
})
.from(agentWakeupRequests)
.where(eq(agentWakeupRequests.reason, MAX_TURN_CONTINUATION_WAKE_REASON));
expect(wakeups).toHaveLength(1);
expect(wakeups[0]).toMatchObject({
id: retryRuns[0]?.wakeupRequestId,
coalescedCount: 1,
});
expect(wakeups[0]?.idempotencyKey).toContain(`:${issueId}:${runId}:1`);
const issue = await db
.select({ executionRunId: issues.executionRunId })
.from(issues)
.where(eq(issues.id, issueId))
.then((rows) => rows[0] ?? null);
expect(issue?.executionRunId).toBe(retryRuns[0]?.id);
});
it("does not promote a duplicate max-turn continuation that does not own the issue lock", async () => {
const { companyId, agentId, issueId, runId, now } = await seedMaxTurnFixture();
const scheduled = await heartbeat.scheduleBoundedRetry(runId, {
now,
retryReason: MAX_TURN_CONTINUATION_RETRY_REASON,
wakeReason: MAX_TURN_CONTINUATION_WAKE_REASON,
maxAttempts: 2,
delayMs: 1_000,
});
expect(scheduled.outcome).toBe("scheduled");
if (scheduled.outcome !== "scheduled") return;
const duplicateWakeupId = randomUUID();
const duplicateRunId = randomUUID();
await db.insert(agentWakeupRequests).values({
id: duplicateWakeupId,
companyId,
agentId,
source: "automation",
triggerDetail: "system",
reason: MAX_TURN_CONTINUATION_WAKE_REASON,
payload: {
issueId,
retryOfRunId: runId,
retryReason: MAX_TURN_CONTINUATION_RETRY_REASON,
scheduledRetryAttempt: 1,
},
status: "queued",
requestedByActorType: "system",
});
await db.insert(heartbeatRuns).values({
id: duplicateRunId,
companyId,
agentId,
invocationSource: "automation",
triggerDetail: "system",
status: "scheduled_retry",
wakeupRequestId: duplicateWakeupId,
retryOfRunId: runId,
scheduledRetryAt: scheduled.dueAt,
scheduledRetryAttempt: 1,
scheduledRetryReason: MAX_TURN_CONTINUATION_RETRY_REASON,
contextSnapshot: {
issueId,
wakeReason: MAX_TURN_CONTINUATION_WAKE_REASON,
retryReason: MAX_TURN_CONTINUATION_RETRY_REASON,
},
});
await db
.update(agentWakeupRequests)
.set({ runId: duplicateRunId })
.where(eq(agentWakeupRequests.id, duplicateWakeupId));
const promotion = await heartbeat.promoteDueScheduledRetries(scheduled.dueAt);
expect(promotion).toEqual({ promoted: 1, runIds: [scheduled.run.id] });
const duplicate = await db
.select({
status: heartbeatRuns.status,
errorCode: heartbeatRuns.errorCode,
})
.from(heartbeatRuns)
.where(eq(heartbeatRuns.id, duplicateRunId))
.then((rows) => rows[0] ?? null);
expect(duplicate).toEqual({
status: "cancelled",
errorCode: "issue_execution_lock_changed",
});
const duplicateWakeup = await db
.select({ status: agentWakeupRequests.status })
.from(agentWakeupRequests)
.where(eq(agentWakeupRequests.id, duplicateWakeupId))
.then((rows) => rows[0] ?? null);
expect(duplicateWakeup?.status).toBe("cancelled");
});
it.each(["blocked", "todo", "backlog"] as const)(
"does not schedule a max-turn continuation when the issue is already %s",
async (issueStatus) => {
const { issueId, runId, now } = await seedMaxTurnFixture({ issueStatus });
const scheduled = await heartbeat.scheduleBoundedRetry(runId, {
now,
retryReason: MAX_TURN_CONTINUATION_RETRY_REASON,
wakeReason: MAX_TURN_CONTINUATION_WAKE_REASON,
maxAttempts: 2,
delayMs: 1_000,
});
expect(scheduled).toMatchObject({
outcome: "not_scheduled",
errorCode: "issue_not_in_progress",
issueId,
});
const retryRuns = await db
.select({ count: sql<number>`count(*)::int` })
.from(heartbeatRuns)
.where(eq(heartbeatRuns.retryOfRunId, runId))
.then((rows) => rows[0]?.count ?? 0);
expect(retryRuns).toBe(0);
},
);
it.each(["blocked", "todo", "backlog"] as const)(
"cancels a due max-turn continuation when the issue moves to %s before retry promotion",
async (issueStatus) => {
const { issueId, runId, now } = await seedMaxTurnFixture();
const scheduled = await heartbeat.scheduleBoundedRetry(runId, {
now,
retryReason: MAX_TURN_CONTINUATION_RETRY_REASON,
wakeReason: MAX_TURN_CONTINUATION_WAKE_REASON,
maxAttempts: 2,
delayMs: 1_000,
});
expect(scheduled.outcome).toBe("scheduled");
if (scheduled.outcome !== "scheduled") return;
await db.update(issues).set({
status: issueStatus,
updatedAt: new Date(now.getTime() + 500),
}).where(eq(issues.id, issueId));
const promotion = await heartbeat.promoteDueScheduledRetries(scheduled.dueAt);
expect(promotion).toEqual({ promoted: 0, runIds: [] });
const retryRun = await db
.select({
status: heartbeatRuns.status,
errorCode: heartbeatRuns.errorCode,
wakeupRequestId: heartbeatRuns.wakeupRequestId,
})
.from(heartbeatRuns)
.where(eq(heartbeatRuns.id, scheduled.run.id))
.then((rows) => rows[0] ?? null);
expect(retryRun).toMatchObject({
status: "cancelled",
errorCode: "issue_not_in_progress",
});
const wakeupRequest = await db
.select({ status: agentWakeupRequests.status })
.from(agentWakeupRequests)
.where(eq(agentWakeupRequests.id, retryRun?.wakeupRequestId ?? ""))
.then((rows) => rows[0] ?? null);
expect(wakeupRequest?.status).toBe("cancelled");
const issue = await db
.select({
executionRunId: issues.executionRunId,
executionAgentNameKey: issues.executionAgentNameKey,
executionLockedAt: issues.executionLockedAt,
})
.from(issues)
.where(eq(issues.id, issueId))
.then((rows) => rows[0] ?? null);
expect(issue).toEqual({
executionRunId: null,
executionAgentNameKey: null,
executionLockedAt: null,
});
const event = await db
.select({
message: heartbeatRunEvents.message,
payload: heartbeatRunEvents.payload,
})
.from(heartbeatRunEvents)
.where(eq(heartbeatRunEvents.runId, scheduled.run.id))
.orderBy(sql`${heartbeatRunEvents.seq} desc`)
.then((rows) => rows[0] ?? null);
expect(event?.message).toContain("no longer in_progress");
expect(event?.payload).toMatchObject({
currentStatus: issueStatus,
requiredStatus: "in_progress",
scheduledRetryReason: MAX_TURN_CONTINUATION_RETRY_REASON,
});
},
);
it("does not queue max-turn continuations after the configured cap", async () => {
const { runId, now } = await seedMaxTurnFixture({ scheduledRetryAttempt: 2 });
const exhausted = await heartbeat.scheduleBoundedRetry(runId, {
now,
retryReason: MAX_TURN_CONTINUATION_RETRY_REASON,
wakeReason: MAX_TURN_CONTINUATION_WAKE_REASON,
maxAttempts: 2,
delayMs: 1_000,
});
expect(exhausted).toEqual({
outcome: "retry_exhausted",
attempt: 3,
maxAttempts: 2,
});
const runCount = await db
.select({ count: sql<number>`count(*)::int` })
.from(heartbeatRuns)
.then((rows) => rows[0]?.count ?? 0);
expect(runCount).toBe(1);
const exhaustionEvent = await db
.select({ message: heartbeatRunEvents.message, payload: heartbeatRunEvents.payload })
.from(heartbeatRunEvents)
.where(eq(heartbeatRunEvents.runId, runId))
.orderBy(sql`${heartbeatRunEvents.id} desc`)
.then((rows) => rows[0] ?? null);
expect(exhaustionEvent?.message).toContain("Bounded retry exhausted");
expect(exhaustionEvent?.payload).toMatchObject({
retryReason: MAX_TURN_CONTINUATION_RETRY_REASON,
maxAttempts: 2,
});
});
it("suppresses max-turn continuation scheduling when budget or dependencies block the issue", async () => {
const budgetBlocked = await seedMaxTurnFixture({ now: new Date("2026-04-20T16:00:00.000Z") });
await db.insert(budgetPolicies).values({
companyId: budgetBlocked.companyId,
scopeType: "agent",
scopeId: budgetBlocked.agentId,
windowKind: "monthly",
metric: "billed_cents",
amount: 0,
hardStopEnabled: true,
isActive: true,
});
await db
.update(agents)
.set({ status: "paused", pauseReason: "budget" })
.where(eq(agents.id, budgetBlocked.agentId));
const budgetResult = await heartbeat.scheduleBoundedRetry(budgetBlocked.runId, {
now: budgetBlocked.now,
retryReason: MAX_TURN_CONTINUATION_RETRY_REASON,
wakeReason: MAX_TURN_CONTINUATION_WAKE_REASON,
maxAttempts: 2,
delayMs: 1_000,
});
expect(budgetResult).toMatchObject({
outcome: "not_scheduled",
errorCode: "budget_blocked",
issueId: budgetBlocked.issueId,
});
await db.delete(budgetPolicies);
await db.delete(issueRelations);
await db.delete(issues);
await db.delete(heartbeatRunEvents);
await db.delete(heartbeatRuns);
await db.delete(agentWakeupRequests);
await db.delete(agentRuntimeState);
await db.delete(agents);
await db.delete(companies);
const dependencyBlocked = await seedMaxTurnFixture({ now: new Date("2026-04-20T17:00:00.000Z") });
const blockerId = randomUUID();
await db.insert(issues).values({
id: blockerId,
companyId: dependencyBlocked.companyId,
title: "Blocker",
status: "todo",
priority: "medium",
issueNumber: 2,
identifier: `T${dependencyBlocked.companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}-2`,
});
await db.insert(issueRelations).values({
companyId: dependencyBlocked.companyId,
issueId: blockerId,
relatedIssueId: dependencyBlocked.issueId,
type: "blocks",
});
const dependencyResult = await heartbeat.scheduleBoundedRetry(dependencyBlocked.runId, {
now: dependencyBlocked.now,
retryReason: MAX_TURN_CONTINUATION_RETRY_REASON,
wakeReason: MAX_TURN_CONTINUATION_WAKE_REASON,
maxAttempts: 2,
delayMs: 1_000,
});
expect(dependencyResult).toMatchObject({
outcome: "not_scheduled",
errorCode: "issue_dependencies_blocked",
issueId: dependencyBlocked.issueId,
});
const retryRuns = await db
.select({ count: sql<number>`count(*)::int` })
.from(heartbeatRuns)
.where(eq(heartbeatRuns.retryOfRunId, dependencyBlocked.runId))
.then((rows) => rows[0]?.count ?? 0);
expect(retryRuns).toBe(0);
});
it("does not defer a new assignee behind the previous assignee's scheduled retry", async () => {
const companyId = randomUUID();
const oldAgentId = randomUUID();

View file

@ -23,7 +23,11 @@ import {
getEmbeddedPostgresTestSupport,
startEmbeddedPostgresTestDatabase,
} from "./helpers/embedded-postgres.js";
import { heartbeatService } from "../services/heartbeat.ts";
import {
MAX_TURN_CONTINUATION_RETRY_REASON,
MAX_TURN_CONTINUATION_WAKE_REASON,
heartbeatService,
} from "../services/heartbeat.ts";
import { runningProcesses } from "../adapters/index.ts";
const mockAdapterExecute = vi.hoisted(() =>
@ -189,6 +193,7 @@ describeEmbeddedPostgres("heartbeat stale queued-run invalidation", () => {
wakeReason: string;
contextExtras?: Record<string, unknown>;
invocationSource?: "assignment" | "automation";
scheduledRetryReason?: string | null;
}) {
const wakeupRequestId = randomUUID();
const runId = randomUUID();
@ -210,6 +215,7 @@ describeEmbeddedPostgres("heartbeat stale queued-run invalidation", () => {
triggerDetail: "system",
status: "queued",
wakeupRequestId,
scheduledRetryReason: input.scheduledRetryReason ?? null,
contextSnapshot: {
issueId: input.issueId,
wakeReason: input.wakeReason,
@ -345,6 +351,154 @@ describeEmbeddedPostgres("heartbeat stale queued-run invalidation", () => {
expect(mockAdapterExecute).not.toHaveBeenCalled();
});
it("cancels queued max-turn continuations when the issue is no longer in_progress before the run starts", async () => {
const { companyId, agentId } = await seedCompanyAndAgent();
const issueId = randomUUID();
await db.insert(issues).values({
id: issueId,
companyId,
title: "Parked max-turn continuation",
status: "blocked",
priority: "medium",
assigneeAgentId: agentId,
});
const { runId, wakeupRequestId } = await seedQueuedRun({
companyId,
agentId,
issueId,
wakeReason: MAX_TURN_CONTINUATION_WAKE_REASON,
invocationSource: "automation",
scheduledRetryReason: MAX_TURN_CONTINUATION_RETRY_REASON,
contextExtras: {
retryReason: MAX_TURN_CONTINUATION_RETRY_REASON,
},
});
await heartbeat.resumeQueuedRuns();
await waitForCondition(async () => {
const run = await db
.select({ status: heartbeatRuns.status })
.from(heartbeatRuns)
.where(eq(heartbeatRuns.id, runId))
.then((rows) => rows[0] ?? null);
return run?.status === "cancelled";
});
const [run, wakeup] = await Promise.all([
db
.select({
status: heartbeatRuns.status,
errorCode: heartbeatRuns.errorCode,
resultJson: heartbeatRuns.resultJson,
})
.from(heartbeatRuns)
.where(eq(heartbeatRuns.id, runId))
.then((rows) => rows[0] ?? null),
db
.select({ status: agentWakeupRequests.status, error: agentWakeupRequests.error })
.from(agentWakeupRequests)
.where(eq(agentWakeupRequests.id, wakeupRequestId))
.then((rows) => rows[0] ?? null),
]);
expect(run?.status).toBe("cancelled");
expect(run?.errorCode).toBe("issue_not_in_progress");
expect(run?.resultJson).toMatchObject({ stopReason: "issue_not_in_progress" });
expect(wakeup?.status).toBe("skipped");
expect(wakeup?.error).toContain("no longer in_progress");
expect(mockAdapterExecute).not.toHaveBeenCalled();
});
it("cancels queued max-turn continuations when another continuation owns the issue lock", async () => {
const { companyId, agentId } = await seedCompanyAndAgent();
const issueId = randomUUID();
const lockOwnerRunId = randomUUID();
await db.insert(heartbeatRuns).values({
id: lockOwnerRunId,
companyId,
agentId,
invocationSource: "automation",
triggerDetail: "system",
status: "scheduled_retry",
scheduledRetryReason: MAX_TURN_CONTINUATION_RETRY_REASON,
scheduledRetryAttempt: 1,
scheduledRetryAt: new Date("2026-04-20T12:00:00.000Z"),
contextSnapshot: {
issueId,
wakeReason: MAX_TURN_CONTINUATION_WAKE_REASON,
retryReason: MAX_TURN_CONTINUATION_RETRY_REASON,
},
});
await db.insert(issues).values({
id: issueId,
companyId,
title: "Duplicate max-turn continuation",
status: "in_progress",
priority: "medium",
assigneeAgentId: agentId,
executionRunId: lockOwnerRunId,
executionAgentNameKey: "claudecoder",
executionLockedAt: new Date("2026-04-20T11:59:00.000Z"),
});
const { runId, wakeupRequestId } = await seedQueuedRun({
companyId,
agentId,
issueId,
wakeReason: MAX_TURN_CONTINUATION_WAKE_REASON,
invocationSource: "automation",
scheduledRetryReason: MAX_TURN_CONTINUATION_RETRY_REASON,
contextExtras: {
retryReason: MAX_TURN_CONTINUATION_RETRY_REASON,
},
});
await heartbeat.resumeQueuedRuns();
await waitForCondition(async () => {
const run = await db
.select({ status: heartbeatRuns.status })
.from(heartbeatRuns)
.where(eq(heartbeatRuns.id, runId))
.then((rows) => rows[0] ?? null);
return run?.status === "cancelled";
});
const [run, wakeup, issue] = await Promise.all([
db
.select({
status: heartbeatRuns.status,
errorCode: heartbeatRuns.errorCode,
resultJson: heartbeatRuns.resultJson,
})
.from(heartbeatRuns)
.where(eq(heartbeatRuns.id, runId))
.then((rows) => rows[0] ?? null),
db
.select({ status: agentWakeupRequests.status, error: agentWakeupRequests.error })
.from(agentWakeupRequests)
.where(eq(agentWakeupRequests.id, wakeupRequestId))
.then((rows) => rows[0] ?? null),
db
.select({ executionRunId: issues.executionRunId })
.from(issues)
.where(eq(issues.id, issueId))
.then((rows) => rows[0] ?? null),
]);
expect(run?.status).toBe("cancelled");
expect(run?.errorCode).toBe("issue_execution_lock_changed");
expect(run?.resultJson).toMatchObject({ stopReason: "issue_execution_lock_changed" });
expect(wakeup?.status).toBe("skipped");
expect(wakeup?.error).toContain("execution lock");
expect(issue?.executionRunId).toBe(lockOwnerRunId);
expect(mockAdapterExecute).not.toHaveBeenCalled();
});
it("cancels queued in_review runs when the current participant changes before the run starts", async () => {
const { companyId, agentId } = await seedCompanyAndAgent();
const otherAgentId = randomUUID();

View file

@ -64,6 +64,40 @@ describe("heartbeat stop metadata", () => {
).toBe("cancelled");
});
it("normalizes max-turn exhaustion stop reasons", () => {
expect(
buildHeartbeatRunStopMetadata({
adapterType: "claude_local",
adapterConfig: {},
outcome: "failed",
errorCode: "turn_limit_exhausted",
errorMessage: "turn limit reached",
}).stopReason,
).toBe("max_turns_exhausted");
const merged = mergeHeartbeatRunStopMetadata(
{ stopReason: "turn_limit_exhausted" },
buildHeartbeatRunStopMetadata({
adapterType: "claude_local",
adapterConfig: {},
outcome: "failed",
errorCode: "adapter_failed",
}),
);
expect(merged.stopReason).toBe("max_turns_exhausted");
});
it("prioritizes succeeded outcome over inconsistent max-turn error metadata", () => {
expect(
buildHeartbeatRunStopMetadata({
adapterType: "claude_local",
adapterConfig: {},
outcome: "succeeded",
errorCode: "max_turns_exhausted",
}).stopReason,
).toBe("completed");
});
it("preserves existing result fields when merging stop metadata", () => {
const result = mergeHeartbeatRunStopMetadata(
{ summary: "done" },

View file

@ -6,6 +6,7 @@ export type HeartbeatRunStopReason =
| "cancelled"
| "budget_paused"
| "paused"
| "max_turns_exhausted"
| "process_lost"
| "adapter_failed";
@ -40,6 +41,12 @@ function defaultTimeoutSecForAdapter(adapterType: string) {
return adapterType === "openclaw_gateway" ? 120 : 0;
}
export function normalizeMaxTurnStopReason(value: unknown): Extract<HeartbeatRunStopReason, "max_turns_exhausted"> | null {
return value === "max_turns_exhausted" || value === "turn_limit_exhausted"
? "max_turns_exhausted"
: null;
}
export function resolveHeartbeatRunTimeoutPolicy(
adapterType: string,
adapterConfig: Record<string, unknown> | null | undefined,
@ -76,6 +83,8 @@ export function inferHeartbeatRunStopReason(input: {
errorMessage?: string | null;
}): HeartbeatRunStopReason {
if (input.outcome === "succeeded") return "completed";
const maxTurnStopReason = normalizeMaxTurnStopReason(input.errorCode);
if (maxTurnStopReason) return maxTurnStopReason;
if (input.outcome === "timed_out") return "timeout";
if (input.outcome === "failed" && input.errorCode === "process_lost") return "process_lost";
if (input.outcome === "cancelled") {
@ -107,9 +116,10 @@ export function mergeHeartbeatRunStopMetadata(
resultJson: Record<string, unknown> | null | undefined,
metadata: HeartbeatRunStopMetadata,
): Record<string, unknown> {
const existingMaxTurnStopReason = normalizeMaxTurnStopReason(resultJson?.stopReason);
return {
...(resultJson ?? {}),
stopReason: metadata.stopReason,
stopReason: existingMaxTurnStopReason ?? metadata.stopReason,
effectiveTimeoutSec: metadata.effectiveTimeoutSec,
timeoutConfigured: metadata.timeoutConfigured,
timeoutSource: metadata.timeoutSource,

View file

@ -70,6 +70,7 @@ import {
import {
buildHeartbeatRunStopMetadata,
mergeHeartbeatRunStopMetadata,
normalizeMaxTurnStopReason,
} from "./heartbeat-stop-metadata.js";
import {
classifyRunLiveness,
@ -189,12 +190,25 @@ const BOUNDED_TRANSIENT_HEARTBEAT_RETRY_JITTER_RATIO = 0.25;
const BOUNDED_TRANSIENT_HEARTBEAT_RETRY_REASON = "transient_failure";
const BOUNDED_TRANSIENT_HEARTBEAT_RETRY_WAKE_REASON = "transient_failure_retry";
const BOUNDED_TRANSIENT_HEARTBEAT_RETRY_MAX_ATTEMPTS = BOUNDED_TRANSIENT_HEARTBEAT_RETRY_DELAYS_MS.length;
export const MAX_TURN_CONTINUATION_RETRY_REASON = "max_turns_continuation";
export const MAX_TURN_CONTINUATION_WAKE_REASON = "max_turns_continuation_retry";
const MAX_TURN_CONTINUATION_DEFAULT_MAX_ATTEMPTS = 2;
const MAX_TURN_CONTINUATION_MAX_ATTEMPTS_CAP = 10;
const MAX_TURN_CONTINUATION_DEFAULT_DELAY_MS = 1_000;
const MAX_TURN_CONTINUATION_MAX_DELAY_MS = 5 * 60 * 1000;
const MAX_TURN_CONTINUATION_LIVE_RUN_STATUSES = ["scheduled_retry", "queued", "running"] as const;
type CodexTransientFallbackMode =
| "same_session"
| "safer_invocation"
| "fresh_session"
| "fresh_session_safer_invocation";
interface MaxTurnContinuationPolicy {
enabled: boolean;
maxAttempts: number;
delayMs: number;
}
function resolveCodexTransientFallbackMode(attempt: number): CodexTransientFallbackMode {
if (attempt <= 1) return "same_session";
if (attempt === 2) return "safer_invocation";
@ -215,6 +229,16 @@ function readHeartbeatRunErrorFamily(
return null;
}
function isMaxTurnExhaustionRun(
run: Pick<typeof heartbeatRuns.$inferSelect, "errorCode" | "resultJson">,
) {
const resultJson = parseObject(run.resultJson);
return Boolean(
normalizeMaxTurnStopReason(resultJson.stopReason) ??
normalizeMaxTurnStopReason(run.errorCode),
);
}
function readTransientRetryNotBeforeFromRun(run: Pick<typeof heartbeatRuns.$inferSelect, "resultJson">) {
const resultJson = parseObject(run.resultJson);
const value = resultJson.retryNotBefore ?? resultJson.transientRetryNotBefore;
@ -4289,6 +4313,274 @@ export function heartbeatService(db: Db, options: HeartbeatServiceOptions = {})
return queued;
}
type ScheduledRetryGate =
| { allowed: true }
| {
allowed: false;
reason: string;
errorCode:
| "agent_not_invokable"
| "budget_blocked"
| "issue_not_found"
| "issue_reassigned"
| "issue_cancelled"
| "issue_terminal_status"
| "issue_not_in_progress"
| "issue_execution_lock_changed"
| "issue_review_participant_changed"
| "issue_paused"
| "issue_dependencies_blocked";
issueId: string | null;
details: Record<string, unknown>;
};
async function evaluateScheduledRetryGate(input: {
run: typeof heartbeatRuns.$inferSelect;
agent: typeof agents.$inferSelect;
contextSnapshot: Record<string, unknown>;
retryReason?: string | null;
enforceIssueExecutionLock?: boolean;
}): Promise<ScheduledRetryGate> {
const { run, agent, contextSnapshot } = input;
const retryReason =
input.retryReason ?? readNonEmptyString(contextSnapshot.retryReason) ?? run.scheduledRetryReason ?? null;
const issueId = readNonEmptyString(contextSnapshot.issueId);
const projectId = readNonEmptyString(contextSnapshot.projectId);
const budgetBlock = await budgets.getInvocationBlock(run.companyId, run.agentId, {
issueId,
projectId,
});
if (budgetBlock) {
return {
allowed: false,
reason: budgetBlock.reason,
errorCode: "budget_blocked",
issueId,
details: {
scopeType: budgetBlock.scopeType,
scopeId: budgetBlock.scopeId,
},
};
}
if (agent.status === "paused" || agent.status === "terminated" || agent.status === "pending_approval") {
return {
allowed: false,
reason: "Scheduled retry suppressed because the agent is not invokable",
errorCode: "agent_not_invokable",
issueId,
details: {
agentId: agent.id,
agentStatus: agent.status,
},
};
}
if (!issueId) return { allowed: true };
const issue = await db
.select({
id: issues.id,
status: issues.status,
assigneeAgentId: issues.assigneeAgentId,
executionRunId: issues.executionRunId,
executionState: issues.executionState,
})
.from(issues)
.where(and(eq(issues.id, issueId), eq(issues.companyId, run.companyId)))
.then((rows) => rows[0] ?? null);
if (!issue) {
return {
allowed: false,
reason: "Scheduled retry suppressed because the target issue no longer exists",
errorCode: "issue_not_found",
issueId,
details: { issueId },
};
}
if (issue.assigneeAgentId !== run.agentId) {
return {
allowed: false,
reason: "Scheduled retry suppressed because issue ownership changed",
errorCode: "issue_reassigned",
issueId,
details: {
issueId,
previousAssigneeAgentId: run.agentId,
currentAssigneeAgentId: issue.assigneeAgentId,
},
};
}
if (issue.status === "cancelled" || issue.status === "done") {
return {
allowed: false,
reason: `Scheduled retry suppressed because issue reached terminal status (${issue.status})`,
errorCode: issue.status === "cancelled" ? "issue_cancelled" : "issue_terminal_status",
issueId,
details: { issueId, currentStatus: issue.status },
};
}
if (retryReason === MAX_TURN_CONTINUATION_RETRY_REASON && issue.status !== "in_progress") {
return {
allowed: false,
reason: `Scheduled max-turn continuation suppressed because issue is no longer in_progress (current status: ${issue.status})`,
errorCode: "issue_not_in_progress",
issueId,
details: { issueId, currentStatus: issue.status, requiredStatus: "in_progress" },
};
}
if (
retryReason === MAX_TURN_CONTINUATION_RETRY_REASON &&
input.enforceIssueExecutionLock &&
issue.executionRunId !== run.id
) {
return {
allowed: false,
reason: "Scheduled max-turn continuation suppressed because the issue execution lock belongs to a different run",
errorCode: "issue_execution_lock_changed",
issueId,
details: {
issueId,
expectedExecutionRunId: run.id,
currentExecutionRunId: issue.executionRunId,
},
};
}
if (issue.status === "in_review") {
const executionState = parseIssueExecutionState(issue.executionState);
const currentParticipant = executionState?.currentParticipant ?? null;
if (currentParticipant) {
const participantMatches =
currentParticipant.type === "agent" && currentParticipant.agentId === run.agentId;
if (!participantMatches) {
return {
allowed: false,
reason: "Scheduled retry suppressed because the issue is waiting on another review participant",
errorCode: "issue_review_participant_changed",
issueId,
details: {
issueId,
currentStageType: executionState?.currentStageType ?? null,
currentParticipant,
},
};
}
}
}
const activePauseHold = await treeControlSvc.getActivePauseHoldGate(run.companyId, issueId);
if (activePauseHold) {
return {
allowed: false,
reason: "Scheduled retry suppressed because the issue is held by an active subtree pause hold",
errorCode: "issue_paused",
issueId,
details: {
issueId,
holdId: activePauseHold.holdId,
rootIssueId: activePauseHold.rootIssueId,
},
};
}
const dependencyReadiness = await issuesSvc.listDependencyReadiness(run.companyId, [issueId]);
const readiness = dependencyReadiness.get(issueId);
if (readiness && !readiness.isDependencyReady) {
return {
allowed: false,
reason: "Scheduled retry suppressed because issue dependencies are still blocked",
errorCode: "issue_dependencies_blocked",
issueId,
details: {
issueId,
unresolvedBlockerIssueIds: readiness.unresolvedBlockerIssueIds,
unresolvedBlockerCount: readiness.unresolvedBlockerCount,
},
};
}
return { allowed: true };
}
async function cancelScheduledRetryForGate(
run: typeof heartbeatRuns.$inferSelect,
gate: Extract<ScheduledRetryGate, { allowed: false }>,
now: Date,
) {
const cancelled = await db
.update(heartbeatRuns)
.set({
status: "cancelled",
finishedAt: now,
error: gate.reason,
errorCode: gate.errorCode,
updatedAt: now,
})
.where(
and(
eq(heartbeatRuns.id, run.id),
eq(heartbeatRuns.status, "scheduled_retry"),
lte(heartbeatRuns.scheduledRetryAt, now),
),
)
.returning()
.then((rows) => rows[0] ?? null);
if (!cancelled) return null;
if (cancelled.wakeupRequestId) {
await db
.update(agentWakeupRequests)
.set({
status: "cancelled",
finishedAt: now,
error: gate.reason,
updatedAt: now,
})
.where(eq(agentWakeupRequests.id, cancelled.wakeupRequestId));
}
if (gate.issueId) {
await db
.update(issues)
.set({
executionRunId: null,
executionAgentNameKey: null,
executionLockedAt: null,
updatedAt: now,
})
.where(
and(
eq(issues.companyId, cancelled.companyId),
eq(issues.id, gate.issueId),
eq(issues.executionRunId, cancelled.id),
),
);
}
await appendRunEvent(cancelled, await nextRunEventSeq(cancelled.id), {
eventType: "lifecycle",
stream: "system",
level: "warn",
message: gate.reason,
payload: {
...gate.details,
scheduledRetryAttempt: cancelled.scheduledRetryAttempt,
scheduledRetryAt: cancelled.scheduledRetryAt ? new Date(cancelled.scheduledRetryAt).toISOString() : null,
scheduledRetryReason: cancelled.scheduledRetryReason,
},
});
return cancelled;
}
async function scheduleBoundedRetryForRun(
run: typeof heartbeatRuns.$inferSelect,
agent: typeof agents.$inferSelect,
@ -4297,13 +4589,28 @@ export function heartbeatService(db: Db, options: HeartbeatServiceOptions = {})
random?: () => number;
retryReason?: string;
wakeReason?: string;
maxAttempts?: number;
delayMs?: number;
},
) {
const now = opts?.now ?? new Date();
const retryReason = opts?.retryReason ?? BOUNDED_TRANSIENT_HEARTBEAT_RETRY_REASON;
const wakeReason = opts?.wakeReason ?? BOUNDED_TRANSIENT_HEARTBEAT_RETRY_WAKE_REASON;
const maxAttempts = Math.max(0, Math.floor(opts?.maxAttempts ?? BOUNDED_TRANSIENT_HEARTBEAT_RETRY_MAX_ATTEMPTS));
const nextAttempt = (run.scheduledRetryAttempt ?? 0) + 1;
const baseSchedule = computeBoundedTransientHeartbeatRetrySchedule(nextAttempt, now, opts?.random);
const baseSchedule = opts?.delayMs != null
? nextAttempt <= maxAttempts
? {
attempt: nextAttempt,
baseDelayMs: Math.max(0, Math.floor(opts.delayMs)),
delayMs: Math.max(0, Math.floor(opts.delayMs)),
dueAt: new Date(now.getTime() + Math.max(0, Math.floor(opts.delayMs))),
maxAttempts,
}
: null
: nextAttempt <= maxAttempts
? computeBoundedTransientHeartbeatRetrySchedule(nextAttempt, now, opts?.random)
: null;
const transientRecovery =
retryReason === BOUNDED_TRANSIENT_HEARTBEAT_RETRY_REASON
? readTransientRecoveryContractFromRun(run)
@ -4323,13 +4630,13 @@ export function heartbeatService(db: Db, options: HeartbeatServiceOptions = {})
payload: {
retryReason,
scheduledRetryAttempt: run.scheduledRetryAttempt ?? 0,
maxAttempts: BOUNDED_TRANSIENT_HEARTBEAT_RETRY_MAX_ATTEMPTS,
maxAttempts,
},
});
return {
outcome: "retry_exhausted" as const,
attempt: nextAttempt,
maxAttempts: BOUNDED_TRANSIENT_HEARTBEAT_RETRY_MAX_ATTEMPTS,
maxAttempts,
};
}
const schedule =
@ -4343,6 +4650,29 @@ export function heartbeatService(db: Db, options: HeartbeatServiceOptions = {})
const contextSnapshot = parseObject(run.contextSnapshot);
const issueId = readNonEmptyString(contextSnapshot.issueId);
if (retryReason === MAX_TURN_CONTINUATION_RETRY_REASON) {
const gate = await evaluateScheduledRetryGate({ run, agent, contextSnapshot, retryReason });
if (!gate.allowed) {
await appendRunEvent(run, await nextRunEventSeq(run.id), {
eventType: "lifecycle",
stream: "system",
level: "warn",
message: gate.reason,
payload: {
retryReason,
scheduledRetryAttempt: nextAttempt,
maxAttempts,
...gate.details,
},
});
return {
outcome: "not_scheduled" as const,
reason: gate.reason,
errorCode: gate.errorCode,
issueId: gate.issueId,
};
}
}
const taskKey = deriveTaskKeyWithHeartbeatFallback(contextSnapshot, null);
const sessionBefore = await resolveSessionBeforeForWakeup(agent, taskKey);
const retryContextSnapshot: Record<string, unknown> = {
@ -4356,8 +4686,158 @@ export function heartbeatService(db: Db, options: HeartbeatServiceOptions = {})
...(transientRetryNotBefore ? { transientRetryNotBefore: transientRetryNotBefore.toISOString() } : {}),
...(codexTransientFallbackMode ? { codexTransientFallbackMode } : {}),
};
const maxTurnContinuationIdempotencyKey = retryReason === MAX_TURN_CONTINUATION_RETRY_REASON
? `max-turn-continuation:${run.companyId}:${issueId ?? "no-issue"}:${run.id}:${schedule.attempt}`
: null;
type ScheduledRetryTransactionResult =
| {
outcome: "scheduled";
run: typeof heartbeatRuns.$inferSelect;
reusedExisting: boolean;
}
| {
outcome: "not_scheduled";
reason: string;
errorCode:
| "issue_not_found"
| "issue_reassigned"
| "issue_cancelled"
| "issue_terminal_status"
| "issue_not_in_progress"
| "issue_execution_lock_changed";
issueId: string | null;
details: Record<string, unknown>;
};
const scheduleResult = await db.transaction(async (tx): Promise<ScheduledRetryTransactionResult> => {
if (retryReason === MAX_TURN_CONTINUATION_RETRY_REASON) {
if (issueId) {
await tx.execute(
sql`select id from issues where company_id = ${run.companyId} and id = ${issueId} for update`,
);
} else {
await tx.execute(
sql`select id from heartbeat_runs where company_id = ${run.companyId} and id = ${run.id} for update`,
);
}
const existingContinuation = await tx
.select()
.from(heartbeatRuns)
.where(
and(
eq(heartbeatRuns.companyId, run.companyId),
eq(heartbeatRuns.retryOfRunId, run.id),
eq(heartbeatRuns.scheduledRetryReason, retryReason),
eq(heartbeatRuns.scheduledRetryAttempt, schedule.attempt),
inArray(heartbeatRuns.status, [...MAX_TURN_CONTINUATION_LIVE_RUN_STATUSES]),
issueId
? sql`${heartbeatRuns.contextSnapshot} ->> 'issueId' = ${issueId}`
: sql`${heartbeatRuns.contextSnapshot} ->> 'issueId' is null`,
),
)
.orderBy(asc(heartbeatRuns.createdAt), asc(heartbeatRuns.id))
.limit(1)
.then((rows) => rows[0] ?? null);
if (existingContinuation) {
if (existingContinuation.wakeupRequestId) {
const existingWakeup = await tx
.select({ coalescedCount: agentWakeupRequests.coalescedCount })
.from(agentWakeupRequests)
.where(eq(agentWakeupRequests.id, existingContinuation.wakeupRequestId))
.then((rows) => rows[0] ?? null);
await tx
.update(agentWakeupRequests)
.set({
coalescedCount: (existingWakeup?.coalescedCount ?? 0) + 1,
updatedAt: now,
})
.where(eq(agentWakeupRequests.id, existingContinuation.wakeupRequestId));
}
return {
outcome: "scheduled",
run: existingContinuation,
reusedExisting: true,
};
}
if (issueId) {
const lockedIssue = await tx
.select({
id: issues.id,
status: issues.status,
assigneeAgentId: issues.assigneeAgentId,
executionRunId: issues.executionRunId,
})
.from(issues)
.where(and(eq(issues.id, issueId), eq(issues.companyId, run.companyId)))
.then((rows) => rows[0] ?? null);
if (!lockedIssue) {
return {
outcome: "not_scheduled",
reason: "Scheduled max-turn continuation suppressed because the target issue no longer exists",
errorCode: "issue_not_found",
issueId,
details: { issueId },
};
}
if (lockedIssue.assigneeAgentId !== run.agentId) {
return {
outcome: "not_scheduled",
reason: "Scheduled max-turn continuation suppressed because issue ownership changed",
errorCode: "issue_reassigned",
issueId,
details: {
issueId,
previousAssigneeAgentId: run.agentId,
currentAssigneeAgentId: lockedIssue.assigneeAgentId,
},
};
}
if (lockedIssue.status === "cancelled" || lockedIssue.status === "done") {
return {
outcome: "not_scheduled",
reason: `Scheduled max-turn continuation suppressed because issue reached terminal status (${lockedIssue.status})`,
errorCode: lockedIssue.status === "cancelled" ? "issue_cancelled" : "issue_terminal_status",
issueId,
details: { issueId, currentStatus: lockedIssue.status },
};
}
if (lockedIssue.status !== "in_progress") {
return {
outcome: "not_scheduled",
reason: `Scheduled max-turn continuation suppressed because issue is no longer in_progress (current status: ${lockedIssue.status})`,
errorCode: "issue_not_in_progress",
issueId,
details: { issueId, currentStatus: lockedIssue.status, requiredStatus: "in_progress" },
};
}
if (lockedIssue.executionRunId !== run.id) {
return {
outcome: "not_scheduled",
reason:
"Scheduled max-turn continuation suppressed because the issue execution lock belongs to a different run",
errorCode: "issue_execution_lock_changed",
issueId,
details: {
issueId,
expectedExecutionRunId: run.id,
currentExecutionRunId: lockedIssue.executionRunId,
},
};
}
}
}
const retryRun = await db.transaction(async (tx) => {
const wakeupRequest = await tx
.insert(agentWakeupRequests)
.values({
@ -4379,6 +4859,7 @@ export function heartbeatService(db: Db, options: HeartbeatServiceOptions = {})
status: "queued",
requestedByActorType: "system",
requestedByActorId: null,
idempotencyKey: maxTurnContinuationIdempotencyKey,
updatedAt: now,
})
.returning()
@ -4425,9 +4906,62 @@ export function heartbeatService(db: Db, options: HeartbeatServiceOptions = {})
.where(and(eq(issues.id, issueId), eq(issues.companyId, run.companyId), eq(issues.executionRunId, run.id)));
}
return scheduledRun;
return {
outcome: "scheduled",
run: scheduledRun,
reusedExisting: false,
};
});
if (scheduleResult.outcome === "not_scheduled") {
await appendRunEvent(run, await nextRunEventSeq(run.id), {
eventType: "lifecycle",
stream: "system",
level: "warn",
message: scheduleResult.reason,
payload: {
retryReason,
scheduledRetryAttempt: nextAttempt,
maxAttempts,
...scheduleResult.details,
},
});
return {
outcome: "not_scheduled" as const,
reason: scheduleResult.reason,
errorCode: scheduleResult.errorCode,
issueId: scheduleResult.issueId,
};
}
const retryRun = scheduleResult.run;
const dueAt = retryRun.scheduledRetryAt ? new Date(retryRun.scheduledRetryAt) : schedule.dueAt;
if (scheduleResult.reusedExisting) {
await appendRunEvent(run, await nextRunEventSeq(run.id), {
eventType: "lifecycle",
stream: "system",
level: "info",
message: `Reused existing max-turn continuation ${retryRun.scheduledRetryAttempt}/${schedule.maxAttempts}`,
payload: {
retryRunId: retryRun.id,
retryReason,
idempotencyKey: maxTurnContinuationIdempotencyKey,
scheduledRetryAttempt: retryRun.scheduledRetryAttempt,
scheduledRetryAt: dueAt.toISOString(),
},
});
return {
outcome: "scheduled" as const,
run: retryRun,
dueAt,
attempt: retryRun.scheduledRetryAttempt,
maxAttempts: schedule.maxAttempts,
reusedExisting: true,
};
}
await appendRunEvent(run, await nextRunEventSeq(run.id), {
eventType: "lifecycle",
stream: "system",
@ -4449,7 +4983,7 @@ export function heartbeatService(db: Db, options: HeartbeatServiceOptions = {})
return {
outcome: "scheduled" as const,
run: retryRun,
dueAt: schedule.dueAt,
dueAt,
attempt: schedule.attempt,
maxAttempts: schedule.maxAttempts,
};
@ -4471,86 +5005,35 @@ export function heartbeatService(db: Db, options: HeartbeatServiceOptions = {})
const promotedRunIds: string[] = [];
for (const dueRun of dueRuns) {
const dueRunIssueId = readNonEmptyString(parseObject(dueRun.contextSnapshot).issueId);
if (dueRunIssueId) {
const issue = await db
.select({
id: issues.id,
status: issues.status,
assigneeAgentId: issues.assigneeAgentId,
executionRunId: issues.executionRunId,
})
.from(issues)
.where(and(eq(issues.id, dueRunIssueId), eq(issues.companyId, dueRun.companyId)))
.then((rows) => rows[0] ?? null);
const agent = await getAgent(dueRun.agentId);
if (!agent) {
await cancelScheduledRetryForGate(dueRun, {
allowed: false,
reason: "Scheduled retry suppressed because the agent no longer exists",
errorCode: "agent_not_invokable",
issueId: readNonEmptyString(parseObject(dueRun.contextSnapshot).issueId),
details: { agentId: dueRun.agentId },
}, now);
continue;
}
if (issue && (issue.assigneeAgentId !== dueRun.agentId || issue.status === "cancelled")) {
const issueCancelled = issue.status === "cancelled";
const reason = issueCancelled
? "Cancelled because the issue was cancelled before the scheduled retry became due"
: "Cancelled because the issue was reassigned before the scheduled retry became due";
const cancelled = await db
.update(heartbeatRuns)
.set({
status: "cancelled",
finishedAt: now,
error: reason,
errorCode: issueCancelled ? "issue_cancelled" : "issue_reassigned",
updatedAt: now,
})
.where(
and(
eq(heartbeatRuns.id, dueRun.id),
eq(heartbeatRuns.status, "scheduled_retry"),
lte(heartbeatRuns.scheduledRetryAt, now),
),
)
.returning()
.then((rows) => rows[0] ?? null);
if (!cancelled) continue;
if (cancelled.wakeupRequestId) {
await db
.update(agentWakeupRequests)
.set({
status: "cancelled",
finishedAt: now,
error: reason,
updatedAt: now,
})
.where(eq(agentWakeupRequests.id, cancelled.wakeupRequestId));
}
if (issue.executionRunId === cancelled.id) {
await db
.update(issues)
.set({
executionRunId: null,
executionAgentNameKey: null,
executionLockedAt: null,
updatedAt: now,
})
.where(and(eq(issues.id, issue.id), eq(issues.executionRunId, cancelled.id)));
}
await appendRunEvent(cancelled, await nextRunEventSeq(cancelled.id), {
eventType: "lifecycle",
stream: "system",
level: "warn",
message: issueCancelled
? "Scheduled retry cancelled because issue was cancelled before it became due"
: "Scheduled retry cancelled because issue ownership changed before it became due",
payload: {
issueId: issue.id,
issueStatus: issue.status,
scheduledRetryAttempt: cancelled.scheduledRetryAttempt,
scheduledRetryAt: cancelled.scheduledRetryAt ? new Date(cancelled.scheduledRetryAt).toISOString() : null,
scheduledRetryReason: cancelled.scheduledRetryReason,
previousRetryAgentId: cancelled.agentId,
currentAssigneeAgentId: issue.assigneeAgentId,
},
});
const contextSnapshot = parseObject(dueRun.contextSnapshot);
const gate = await evaluateScheduledRetryGate({
run: dueRun,
agent,
contextSnapshot,
retryReason: dueRun.scheduledRetryReason,
enforceIssueExecutionLock: dueRun.scheduledRetryReason === MAX_TURN_CONTINUATION_RETRY_REASON,
});
if (!gate.allowed) {
if (
gate.errorCode === "issue_not_found" &&
dueRun.scheduledRetryReason !== MAX_TURN_CONTINUATION_RETRY_REASON
) {
// Preserve legacy transient retry behavior for runs that only carry a
// loose task context rather than a persisted issue row.
} else {
await cancelScheduledRetryForGate(dueRun, gate, now);
continue;
}
}
@ -4617,6 +5100,20 @@ export function heartbeatService(db: Db, options: HeartbeatServiceOptions = {})
};
}
function parseMaxTurnContinuationPolicy(agent: typeof agents.$inferSelect): MaxTurnContinuationPolicy {
const runtimeConfig = parseObject(agent.runtimeConfig);
const heartbeat = parseObject(runtimeConfig.heartbeat);
const configured = parseObject(heartbeat.maxTurnContinuation);
const rawMaxAttempts = Math.floor(asNumber(configured.maxAttempts, MAX_TURN_CONTINUATION_DEFAULT_MAX_ATTEMPTS));
const rawDelayMs = Math.floor(asNumber(configured.delayMs, MAX_TURN_CONTINUATION_DEFAULT_DELAY_MS));
return {
enabled: asBoolean(configured.enabled, true),
maxAttempts: Math.max(0, Math.min(MAX_TURN_CONTINUATION_MAX_ATTEMPTS_CAP, rawMaxAttempts)),
delayMs: Math.max(0, Math.min(MAX_TURN_CONTINUATION_MAX_DELAY_MS, rawDelayMs)),
};
}
function issueRunPriorityRank(priority: string | null | undefined) {
switch (priority) {
case "critical":
@ -4857,6 +5354,8 @@ export function heartbeatService(db: Db, options: HeartbeatServiceOptions = {})
| "issue_not_found"
| "issue_assignee_changed"
| "issue_terminal_status"
| "issue_not_in_progress"
| "issue_execution_lock_changed"
| "issue_review_participant_changed";
details: Record<string, unknown>;
};
@ -4871,6 +5370,7 @@ export function heartbeatService(db: Db, options: HeartbeatServiceOptions = {})
id: issues.id,
status: issues.status,
assigneeAgentId: issues.assigneeAgentId,
executionRunId: issues.executionRunId,
executionState: issues.executionState,
})
.from(issues)
@ -4889,6 +5389,7 @@ export function heartbeatService(db: Db, options: HeartbeatServiceOptions = {})
const wakeCommentId = deriveCommentId(context, null);
const isInteractionWake = allowsIssueInteractionWake(context);
const resumeIntent = context.resumeIntent === true || context.followUpRequested === true;
const retryReason = readNonEmptyString(context.retryReason) ?? run.scheduledRetryReason ?? null;
if (issue.assigneeAgentId !== run.agentId && !isInteractionWake) {
return {
@ -4915,6 +5416,29 @@ export function heartbeatService(db: Db, options: HeartbeatServiceOptions = {})
}
}
if (retryReason === MAX_TURN_CONTINUATION_RETRY_REASON && issue.status !== "in_progress") {
return {
stale: true,
errorCode: "issue_not_in_progress",
reason: `Cancelled because max-turn continuation issue is no longer in_progress (current status: ${issue.status}) before the queued run could start`,
details: { issueId, currentStatus: issue.status, requiredStatus: "in_progress" },
};
}
if (retryReason === MAX_TURN_CONTINUATION_RETRY_REASON && issue.executionRunId !== run.id) {
return {
stale: true,
errorCode: "issue_execution_lock_changed",
reason:
"Cancelled because max-turn continuation no longer owns the issue execution lock before the queued run could start",
details: {
issueId,
expectedExecutionRunId: run.id,
currentExecutionRunId: issue.executionRunId,
},
};
}
if (issue.status === "in_review") {
const executionState = parseIssueExecutionState(issue.executionState);
const currentParticipant = executionState?.currentParticipant ?? null;
@ -6683,7 +7207,28 @@ export function heartbeatService(db: Db, options: HeartbeatServiceOptions = {})
);
}
}
if (outcome === "failed" && readTransientRecoveryContractFromRun(livenessRun)) {
if (outcome === "failed" && isMaxTurnExhaustionRun(livenessRun)) {
const policy = parseMaxTurnContinuationPolicy(agent);
if (policy.enabled && policy.maxAttempts > 0) {
await scheduleBoundedRetryForRun(livenessRun, agent, {
retryReason: MAX_TURN_CONTINUATION_RETRY_REASON,
wakeReason: MAX_TURN_CONTINUATION_WAKE_REASON,
maxAttempts: policy.maxAttempts,
delayMs: policy.delayMs,
});
} else {
await appendRunEvent(livenessRun, await nextRunEventSeq(livenessRun.id), {
eventType: "lifecycle",
stream: "system",
level: "warn",
message: "Max-turn continuation suppressed because the policy is disabled",
payload: {
retryReason: MAX_TURN_CONTINUATION_RETRY_REASON,
policy,
},
});
}
} else if (outcome === "failed" && readTransientRecoveryContractFromRun(livenessRun)) {
await scheduleBoundedRetryForRun(livenessRun, agent);
}
await finalizeIssueCommentPolicy(livenessRun, agent);
@ -8443,6 +8988,8 @@ export function heartbeatService(db: Db, options: HeartbeatServiceOptions = {})
random?: () => number;
retryReason?: string;
wakeReason?: string;
maxAttempts?: number;
delayMs?: number;
},
) => {
const run = await getRun(runId, { unsafeFullResultJson: true });