mirror of
https://github.com/alkimake/paperclip.git
synced 2026-06-15 18:30:39 +09:00
[codex] Harden heartbeat scheduling and runtime controls (#4223)
## Thinking Path > - Paperclip orchestrates AI agents through issue checkout, heartbeat runs, routines, and auditable control-plane state > - The runtime path has to recover from lost local processes, transient adapter failures, blocked dependencies, and routine coalescing without stranding work > - The existing branch carried several reliability fixes across heartbeat scheduling, issue runtime controls, routine dispatch, and operator-facing run state > - These changes belong together because they share backend contracts, migrations, and runtime status semantics > - This pull request groups the control-plane/runtime slice so it can merge independently from board UI polish and adapter sandbox work > - The benefit is safer heartbeat recovery, clearer runtime controls, and more predictable recurring execution behavior ## What Changed - Adds bounded heartbeat retry scheduling, scheduled retry state, and Codex transient failure recovery handling. - Tightens heartbeat process recovery, blocker wake behavior, issue comment wake handling, routine dispatch coalescing, and activity/dashboard bounds. - Adds runtime-control MCP tools and Paperclip skill docs for issue workspace runtime management. - Adds migrations `0061_lively_thor_girl.sql` and `0062_routine_run_dispatch_fingerprint.sql`. - Surfaces retry state in run ledger/agent UI and keeps related shared types synchronized. ## Verification - `pnpm exec vitest run server/src/__tests__/heartbeat-retry-scheduling.test.ts server/src/__tests__/heartbeat-process-recovery.test.ts server/src/__tests__/routines-service.test.ts` - `pnpm exec vitest run src/tools.test.ts` from `packages/mcp-server` ## Risks - Medium risk: this touches heartbeat recovery and routine dispatch, which are central execution paths. - Migration order matters if split branches land out of order: merge this PR before branches that assume the new runtime/routine fields. - Runtime retry behavior should be watched in CI and in local operator smoke tests because it changes how transient failures are resumed. > For core feature work, check [`ROADMAP.md`](ROADMAP.md) first and discuss it in `#dev` before opening the PR. Feature PRs that overlap with planned core work may need to be redirected — check the roadmap first. See `CONTRIBUTING.md`. ## Model Used - OpenAI Codex, GPT-5-based coding agent runtime, shell/git tool use enabled. Exact hosted model build and context window are not exposed in this Paperclip heartbeat environment. ## Checklist - [x] I have included a thinking path that traces from project context to this change - [x] I have specified the model used (with version and capability details) - [x] I have checked ROADMAP.md and confirmed this PR does not duplicate planned core work - [x] I have run tests locally and they pass - [x] I have added or updated tests where applicable - [ ] If this change affects the UI, I have included before/after screenshots - [x] I have updated relevant documentation to reflect my changes - [x] I have considered and documented any risks above - [x] I will address all Greptile and reviewer comments before requesting merge
This commit is contained in:
parent
ab9051b595
commit
09d0678840
61 changed files with 17622 additions and 456 deletions
|
|
@ -1,4 +1,4 @@
|
|||
import { and, desc, eq, isNull, or, sql } from "drizzle-orm";
|
||||
import { and, asc, desc, eq, inArray, isNull, or, sql } from "drizzle-orm";
|
||||
import type { Db } from "@paperclipai/db";
|
||||
import {
|
||||
activityLog,
|
||||
|
|
@ -21,6 +21,15 @@ export interface ActivityFilters {
|
|||
agentId?: string;
|
||||
entityType?: string;
|
||||
entityId?: string;
|
||||
limit?: number;
|
||||
}
|
||||
|
||||
const DEFAULT_ACTIVITY_LIMIT = 100;
|
||||
const MAX_ACTIVITY_LIMIT = 500;
|
||||
|
||||
export function normalizeActivityLimit(limit: number | undefined) {
|
||||
if (!Number.isFinite(limit)) return DEFAULT_ACTIVITY_LIMIT;
|
||||
return Math.max(1, Math.min(MAX_ACTIVITY_LIMIT, Math.floor(limit ?? DEFAULT_ACTIVITY_LIMIT)));
|
||||
}
|
||||
|
||||
export function activityService(db: Db) {
|
||||
|
|
@ -316,6 +325,7 @@ export function activityService(db: Db) {
|
|||
return {
|
||||
list: (filters: ActivityFilters) => {
|
||||
const conditions = [eq(activityLog.companyId, filters.companyId)];
|
||||
const limit = normalizeActivityLimit(filters.limit);
|
||||
|
||||
if (filters.agentId) {
|
||||
conditions.push(eq(activityLog.agentId, filters.agentId));
|
||||
|
|
@ -347,6 +357,7 @@ export function activityService(db: Db) {
|
|||
),
|
||||
)
|
||||
.orderBy(desc(activityLog.createdAt))
|
||||
.limit(limit)
|
||||
.then((rows) => rows.map((r) => r.activityLog));
|
||||
},
|
||||
|
||||
|
|
@ -364,7 +375,7 @@ export function activityService(db: Db) {
|
|||
|
||||
runsForIssue: async (companyId: string, issueId: string) => {
|
||||
scheduleRunLivenessBackfill(companyId, issueId);
|
||||
return db
|
||||
const runs = await db
|
||||
.select({
|
||||
runId: heartbeatRuns.id,
|
||||
status: heartbeatRuns.status,
|
||||
|
|
@ -377,6 +388,10 @@ export function activityService(db: Db) {
|
|||
usageJson: summarizedUsageJson,
|
||||
resultJson: summarizedResultJson,
|
||||
logBytes: heartbeatRuns.logBytes,
|
||||
retryOfRunId: heartbeatRuns.retryOfRunId,
|
||||
scheduledRetryAt: heartbeatRuns.scheduledRetryAt,
|
||||
scheduledRetryAttempt: heartbeatRuns.scheduledRetryAttempt,
|
||||
scheduledRetryReason: heartbeatRuns.scheduledRetryReason,
|
||||
livenessState: heartbeatRuns.livenessState,
|
||||
livenessReason: heartbeatRuns.livenessReason,
|
||||
continuationAttempt: heartbeatRuns.continuationAttempt,
|
||||
|
|
@ -408,6 +423,34 @@ export function activityService(db: Db) {
|
|||
),
|
||||
)
|
||||
.orderBy(desc(heartbeatRuns.createdAt));
|
||||
|
||||
if (runs.length === 0) return runs;
|
||||
|
||||
const exhaustionRows = await db
|
||||
.select({
|
||||
runId: heartbeatRunEvents.runId,
|
||||
message: heartbeatRunEvents.message,
|
||||
})
|
||||
.from(heartbeatRunEvents)
|
||||
.where(
|
||||
and(
|
||||
inArray(heartbeatRunEvents.runId, runs.map((run) => run.runId)),
|
||||
eq(heartbeatRunEvents.eventType, "lifecycle"),
|
||||
sql`${heartbeatRunEvents.message} like 'Bounded retry exhausted%'`,
|
||||
),
|
||||
)
|
||||
.orderBy(asc(heartbeatRunEvents.runId), desc(heartbeatRunEvents.id));
|
||||
|
||||
const retryExhaustedReasonByRunId = new Map<string, string>();
|
||||
for (const row of exhaustionRows) {
|
||||
if (!row.message || retryExhaustedReasonByRunId.has(row.runId)) continue;
|
||||
retryExhaustedReasonByRunId.set(row.runId, row.message);
|
||||
}
|
||||
|
||||
return runs.map((run) => ({
|
||||
...run,
|
||||
retryExhaustedReason: retryExhaustedReasonByRunId.get(run.runId) ?? null,
|
||||
}));
|
||||
},
|
||||
|
||||
issuesForRun: async (runId: string) => {
|
||||
|
|
|
|||
|
|
@ -10,6 +10,10 @@ function formatUtcDateKey(date: Date): string {
|
|||
return date.toISOString().slice(0, 10);
|
||||
}
|
||||
|
||||
export function getUtcMonthStart(date: Date): Date {
|
||||
return new Date(Date.UTC(date.getUTCFullYear(), date.getUTCMonth(), 1));
|
||||
}
|
||||
|
||||
function getRecentUtcDateKeys(now: Date, days: number): string[] {
|
||||
const todayUtc = Date.UTC(now.getUTCFullYear(), now.getUTCMonth(), now.getUTCDate());
|
||||
return Array.from({ length: days }, (_, index) => {
|
||||
|
|
@ -76,7 +80,7 @@ export function dashboardService(db: Db) {
|
|||
}
|
||||
|
||||
const now = new Date();
|
||||
const monthStart = new Date(Date.UTC(now.getUTCFullYear(), now.getUTCMonth(), 1));
|
||||
const monthStart = getUtcMonthStart(now);
|
||||
const runActivityDays = getRecentUtcDateKeys(now, DASHBOARD_RUN_ACTIVITY_DAYS);
|
||||
const runActivityStart = new Date(`${runActivityDays[0]}T00:00:00.000Z`);
|
||||
const [{ monthSpend }] = await db
|
||||
|
|
|
|||
|
|
@ -3,7 +3,7 @@ import path from "node:path";
|
|||
import { execFile as execFileCallback } from "node:child_process";
|
||||
import { promisify } from "node:util";
|
||||
import { randomUUID } from "node:crypto";
|
||||
import { and, asc, desc, eq, getTableColumns, gt, inArray, isNull, notInArray, or, sql } from "drizzle-orm";
|
||||
import { and, asc, desc, eq, getTableColumns, gt, inArray, isNull, lte, notInArray, or, sql } from "drizzle-orm";
|
||||
import type { Db } from "@paperclipai/db";
|
||||
import {
|
||||
AGENT_DEFAULT_MAX_CONCURRENT_RUNS,
|
||||
|
|
@ -134,8 +134,31 @@ const MAX_INLINE_WAKE_COMMENTS = 8;
|
|||
const MAX_INLINE_WAKE_COMMENT_BODY_CHARS = 4_000;
|
||||
const MAX_INLINE_WAKE_COMMENT_BODY_TOTAL_CHARS = 12_000;
|
||||
const execFile = promisify(execFileCallback);
|
||||
const ACTIVE_HEARTBEAT_RUN_STATUSES = ["queued", "running"] as const;
|
||||
const EXECUTION_PATH_HEARTBEAT_RUN_STATUSES = ["queued", "running", "scheduled_retry"] as const;
|
||||
const CANCELLABLE_HEARTBEAT_RUN_STATUSES = ["queued", "running", "scheduled_retry"] as const;
|
||||
const UNSUCCESSFUL_HEARTBEAT_RUN_TERMINAL_STATUSES = ["failed", "cancelled", "timed_out"] as const;
|
||||
export const BOUNDED_TRANSIENT_HEARTBEAT_RETRY_DELAYS_MS = [
|
||||
2 * 60 * 1000,
|
||||
10 * 60 * 1000,
|
||||
30 * 60 * 1000,
|
||||
2 * 60 * 60 * 1000,
|
||||
] as const;
|
||||
const BOUNDED_TRANSIENT_HEARTBEAT_RETRY_JITTER_RATIO = 0.25;
|
||||
const BOUNDED_TRANSIENT_HEARTBEAT_RETRY_REASON = "transient_failure";
|
||||
const BOUNDED_TRANSIENT_HEARTBEAT_RETRY_WAKE_REASON = "transient_failure_retry";
|
||||
const BOUNDED_TRANSIENT_HEARTBEAT_RETRY_MAX_ATTEMPTS = BOUNDED_TRANSIENT_HEARTBEAT_RETRY_DELAYS_MS.length;
|
||||
type CodexTransientFallbackMode =
|
||||
| "same_session"
|
||||
| "safer_invocation"
|
||||
| "fresh_session"
|
||||
| "fresh_session_safer_invocation";
|
||||
|
||||
function resolveCodexTransientFallbackMode(attempt: number): CodexTransientFallbackMode {
|
||||
if (attempt <= 1) return "same_session";
|
||||
if (attempt === 2) return "safer_invocation";
|
||||
if (attempt === 3) return "fresh_session";
|
||||
return "fresh_session_safer_invocation";
|
||||
}
|
||||
const RUNNING_ISSUE_WAKE_REASONS_REQUIRING_FOLLOWUP = new Set(["approval_approved"]);
|
||||
const SESSIONED_LOCAL_ADAPTERS = new Set([
|
||||
"claude_local",
|
||||
|
|
@ -211,6 +234,26 @@ export function applyRunScopedMentionedSkillKeys(
|
|||
]);
|
||||
}
|
||||
|
||||
export function computeBoundedTransientHeartbeatRetrySchedule(
|
||||
attempt: number,
|
||||
now = new Date(),
|
||||
random: () => number = Math.random,
|
||||
) {
|
||||
if (!Number.isInteger(attempt) || attempt <= 0) return null;
|
||||
const baseDelayMs = BOUNDED_TRANSIENT_HEARTBEAT_RETRY_DELAYS_MS[attempt - 1];
|
||||
if (typeof baseDelayMs !== "number") return null;
|
||||
const sample = Math.min(1, Math.max(0, random()));
|
||||
const jitterMultiplier = 1 + (((sample * 2) - 1) * BOUNDED_TRANSIENT_HEARTBEAT_RETRY_JITTER_RATIO);
|
||||
const delayMs = Math.max(1_000, Math.round(baseDelayMs * jitterMultiplier));
|
||||
return {
|
||||
attempt,
|
||||
baseDelayMs,
|
||||
delayMs,
|
||||
dueAt: new Date(now.getTime() + delayMs),
|
||||
maxAttempts: BOUNDED_TRANSIENT_HEARTBEAT_RETRY_MAX_ATTEMPTS,
|
||||
};
|
||||
}
|
||||
|
||||
async function resolveRunScopedMentionedSkillKeys(input: {
|
||||
db: Db;
|
||||
companyId: string;
|
||||
|
|
@ -466,6 +509,9 @@ const heartbeatRunListColumns = {
|
|||
processStartedAt: heartbeatRuns.processStartedAt,
|
||||
retryOfRunId: heartbeatRuns.retryOfRunId,
|
||||
processLossRetryCount: heartbeatRuns.processLossRetryCount,
|
||||
scheduledRetryAt: heartbeatRuns.scheduledRetryAt,
|
||||
scheduledRetryAttempt: heartbeatRuns.scheduledRetryAttempt,
|
||||
scheduledRetryReason: heartbeatRuns.scheduledRetryReason,
|
||||
livenessState: heartbeatRuns.livenessState,
|
||||
livenessReason: heartbeatRuns.livenessReason,
|
||||
continuationAttempt: heartbeatRuns.continuationAttempt,
|
||||
|
|
@ -1192,6 +1238,51 @@ function shouldRequireIssueCommentForWake(
|
|||
);
|
||||
}
|
||||
|
||||
const BLOCKED_INTERACTION_WAKE_REASONS = new Set([
|
||||
"issue_commented",
|
||||
"issue_reopened_via_comment",
|
||||
"issue_comment_mentioned",
|
||||
]);
|
||||
|
||||
function allowsBlockedIssueInteractionWake(
|
||||
contextSnapshot: Record<string, unknown> | null | undefined,
|
||||
) {
|
||||
const wakeReason = readNonEmptyString(contextSnapshot?.wakeReason);
|
||||
if (!wakeReason || !BLOCKED_INTERACTION_WAKE_REASONS.has(wakeReason)) return false;
|
||||
return Boolean(deriveCommentId(contextSnapshot, null));
|
||||
}
|
||||
|
||||
async function listUnresolvedBlockerSummaries(
|
||||
dbOrTx: Pick<Db, "select">,
|
||||
companyId: string,
|
||||
issueId: string,
|
||||
unresolvedBlockerIssueIds: string[],
|
||||
) {
|
||||
const ids = [...new Set(unresolvedBlockerIssueIds.filter(Boolean))];
|
||||
if (ids.length === 0) return [];
|
||||
return dbOrTx
|
||||
.select({
|
||||
id: issues.id,
|
||||
identifier: issues.identifier,
|
||||
title: issues.title,
|
||||
status: issues.status,
|
||||
priority: issues.priority,
|
||||
assigneeAgentId: issues.assigneeAgentId,
|
||||
assigneeUserId: issues.assigneeUserId,
|
||||
})
|
||||
.from(issueRelations)
|
||||
.innerJoin(issues, eq(issueRelations.issueId, issues.id))
|
||||
.where(
|
||||
and(
|
||||
eq(issueRelations.companyId, companyId),
|
||||
eq(issueRelations.type, "blocks"),
|
||||
eq(issueRelations.relatedIssueId, issueId),
|
||||
inArray(issues.id, ids),
|
||||
),
|
||||
)
|
||||
.orderBy(asc(issues.title));
|
||||
}
|
||||
|
||||
export function formatRuntimeWorkspaceWarningLog(warning: string) {
|
||||
return {
|
||||
stream: "stdout" as const,
|
||||
|
|
@ -1525,6 +1616,13 @@ async function buildPaperclipWakePayload(input: {
|
|||
}
|
||||
: null,
|
||||
checkedOutByHarness: input.contextSnapshot[PAPERCLIP_HARNESS_CHECKOUT_KEY] === true,
|
||||
dependencyBlockedInteraction: input.contextSnapshot.dependencyBlockedInteraction === true,
|
||||
unresolvedBlockerIssueIds: Array.isArray(input.contextSnapshot.unresolvedBlockerIssueIds)
|
||||
? input.contextSnapshot.unresolvedBlockerIssueIds.filter((value): value is string => typeof value === "string" && value.length > 0)
|
||||
: [],
|
||||
unresolvedBlockerSummaries: Array.isArray(input.contextSnapshot.unresolvedBlockerSummaries)
|
||||
? input.contextSnapshot.unresolvedBlockerSummaries
|
||||
: [],
|
||||
executionStage: Object.keys(executionStage).length > 0 ? executionStage : null,
|
||||
continuationSummary: continuationSummary
|
||||
? {
|
||||
|
|
@ -3057,6 +3155,219 @@ export function heartbeatService(db: Db) {
|
|||
return queued;
|
||||
}
|
||||
|
||||
async function scheduleBoundedRetryForRun(
|
||||
run: typeof heartbeatRuns.$inferSelect,
|
||||
agent: typeof agents.$inferSelect,
|
||||
opts?: {
|
||||
now?: Date;
|
||||
random?: () => number;
|
||||
retryReason?: string;
|
||||
wakeReason?: string;
|
||||
},
|
||||
) {
|
||||
const now = opts?.now ?? new Date();
|
||||
const retryReason = opts?.retryReason ?? BOUNDED_TRANSIENT_HEARTBEAT_RETRY_REASON;
|
||||
const wakeReason = opts?.wakeReason ?? BOUNDED_TRANSIENT_HEARTBEAT_RETRY_WAKE_REASON;
|
||||
const nextAttempt = (run.scheduledRetryAttempt ?? 0) + 1;
|
||||
const schedule = computeBoundedTransientHeartbeatRetrySchedule(nextAttempt, now, opts?.random);
|
||||
const codexTransientFallbackMode =
|
||||
agent.adapterType === "codex_local" && retryReason === BOUNDED_TRANSIENT_HEARTBEAT_RETRY_REASON && run.errorCode === "codex_transient_upstream"
|
||||
? resolveCodexTransientFallbackMode(nextAttempt)
|
||||
: null;
|
||||
|
||||
if (!schedule) {
|
||||
await appendRunEvent(run, await nextRunEventSeq(run.id), {
|
||||
eventType: "lifecycle",
|
||||
stream: "system",
|
||||
level: "warn",
|
||||
message: `Bounded retry exhausted after ${run.scheduledRetryAttempt ?? 0} scheduled attempts; no further automatic retry will be queued`,
|
||||
payload: {
|
||||
retryReason,
|
||||
scheduledRetryAttempt: run.scheduledRetryAttempt ?? 0,
|
||||
maxAttempts: BOUNDED_TRANSIENT_HEARTBEAT_RETRY_MAX_ATTEMPTS,
|
||||
},
|
||||
});
|
||||
return {
|
||||
outcome: "retry_exhausted" as const,
|
||||
attempt: nextAttempt,
|
||||
maxAttempts: BOUNDED_TRANSIENT_HEARTBEAT_RETRY_MAX_ATTEMPTS,
|
||||
};
|
||||
}
|
||||
|
||||
const contextSnapshot = parseObject(run.contextSnapshot);
|
||||
const issueId = readNonEmptyString(contextSnapshot.issueId);
|
||||
const taskKey = deriveTaskKeyWithHeartbeatFallback(contextSnapshot, null);
|
||||
const sessionBefore = await resolveSessionBeforeForWakeup(agent, taskKey);
|
||||
const retryContextSnapshot: Record<string, unknown> = {
|
||||
...contextSnapshot,
|
||||
retryOfRunId: run.id,
|
||||
wakeReason,
|
||||
retryReason,
|
||||
scheduledRetryAttempt: schedule.attempt,
|
||||
scheduledRetryAt: schedule.dueAt.toISOString(),
|
||||
...(codexTransientFallbackMode ? { codexTransientFallbackMode } : {}),
|
||||
};
|
||||
|
||||
const retryRun = await db.transaction(async (tx) => {
|
||||
const wakeupRequest = await tx
|
||||
.insert(agentWakeupRequests)
|
||||
.values({
|
||||
companyId: run.companyId,
|
||||
agentId: run.agentId,
|
||||
source: "automation",
|
||||
triggerDetail: "system",
|
||||
reason: wakeReason,
|
||||
payload: {
|
||||
...(issueId ? { issueId } : {}),
|
||||
retryOfRunId: run.id,
|
||||
retryReason,
|
||||
scheduledRetryAttempt: schedule.attempt,
|
||||
scheduledRetryAt: schedule.dueAt.toISOString(),
|
||||
...(codexTransientFallbackMode ? { codexTransientFallbackMode } : {}),
|
||||
},
|
||||
status: "queued",
|
||||
requestedByActorType: "system",
|
||||
requestedByActorId: null,
|
||||
updatedAt: now,
|
||||
})
|
||||
.returning()
|
||||
.then((rows) => rows[0]);
|
||||
|
||||
const scheduledRun = await tx
|
||||
.insert(heartbeatRuns)
|
||||
.values({
|
||||
companyId: run.companyId,
|
||||
agentId: run.agentId,
|
||||
invocationSource: "automation",
|
||||
triggerDetail: "system",
|
||||
status: "scheduled_retry",
|
||||
wakeupRequestId: wakeupRequest.id,
|
||||
contextSnapshot: retryContextSnapshot,
|
||||
sessionIdBefore: sessionBefore,
|
||||
retryOfRunId: run.id,
|
||||
scheduledRetryAt: schedule.dueAt,
|
||||
scheduledRetryAttempt: schedule.attempt,
|
||||
scheduledRetryReason: retryReason,
|
||||
continuationAttempt: readContinuationAttempt(retryContextSnapshot.livenessContinuationAttempt),
|
||||
updatedAt: now,
|
||||
})
|
||||
.returning()
|
||||
.then((rows) => rows[0]);
|
||||
|
||||
await tx
|
||||
.update(agentWakeupRequests)
|
||||
.set({
|
||||
runId: scheduledRun.id,
|
||||
updatedAt: now,
|
||||
})
|
||||
.where(eq(agentWakeupRequests.id, wakeupRequest.id));
|
||||
|
||||
if (issueId) {
|
||||
await tx
|
||||
.update(issues)
|
||||
.set({
|
||||
executionRunId: scheduledRun.id,
|
||||
executionAgentNameKey: normalizeAgentNameKey(agent.name),
|
||||
executionLockedAt: now,
|
||||
updatedAt: now,
|
||||
})
|
||||
.where(and(eq(issues.id, issueId), eq(issues.companyId, run.companyId), eq(issues.executionRunId, run.id)));
|
||||
}
|
||||
|
||||
return scheduledRun;
|
||||
});
|
||||
|
||||
await appendRunEvent(run, await nextRunEventSeq(run.id), {
|
||||
eventType: "lifecycle",
|
||||
stream: "system",
|
||||
level: "warn",
|
||||
message: `Scheduled bounded retry ${schedule.attempt}/${schedule.maxAttempts} for ${schedule.dueAt.toISOString()}`,
|
||||
payload: {
|
||||
retryRunId: retryRun.id,
|
||||
retryReason,
|
||||
scheduledRetryAttempt: schedule.attempt,
|
||||
scheduledRetryAt: schedule.dueAt.toISOString(),
|
||||
baseDelayMs: schedule.baseDelayMs,
|
||||
delayMs: schedule.delayMs,
|
||||
...(codexTransientFallbackMode ? { codexTransientFallbackMode } : {}),
|
||||
},
|
||||
});
|
||||
|
||||
return {
|
||||
outcome: "scheduled" as const,
|
||||
run: retryRun,
|
||||
dueAt: schedule.dueAt,
|
||||
attempt: schedule.attempt,
|
||||
maxAttempts: schedule.maxAttempts,
|
||||
};
|
||||
}
|
||||
|
||||
async function promoteDueScheduledRetries(now = new Date()) {
|
||||
const dueRuns = await db
|
||||
.select()
|
||||
.from(heartbeatRuns)
|
||||
.where(
|
||||
and(
|
||||
eq(heartbeatRuns.status, "scheduled_retry"),
|
||||
lte(heartbeatRuns.scheduledRetryAt, now),
|
||||
),
|
||||
)
|
||||
.orderBy(asc(heartbeatRuns.scheduledRetryAt), asc(heartbeatRuns.createdAt), asc(heartbeatRuns.id))
|
||||
.limit(50);
|
||||
|
||||
const promotedRunIds: string[] = [];
|
||||
|
||||
for (const dueRun of dueRuns) {
|
||||
const promoted = await db
|
||||
.update(heartbeatRuns)
|
||||
.set({
|
||||
status: "queued",
|
||||
updatedAt: now,
|
||||
})
|
||||
.where(
|
||||
and(
|
||||
eq(heartbeatRuns.id, dueRun.id),
|
||||
eq(heartbeatRuns.status, "scheduled_retry"),
|
||||
lte(heartbeatRuns.scheduledRetryAt, now),
|
||||
),
|
||||
)
|
||||
.returning()
|
||||
.then((rows) => rows[0] ?? null);
|
||||
if (!promoted) continue;
|
||||
|
||||
promotedRunIds.push(promoted.id);
|
||||
|
||||
await appendRunEvent(promoted, await nextRunEventSeq(promoted.id), {
|
||||
eventType: "lifecycle",
|
||||
stream: "system",
|
||||
level: "info",
|
||||
message: "Scheduled retry became due and was promoted to the queued run pool",
|
||||
payload: {
|
||||
scheduledRetryAttempt: promoted.scheduledRetryAttempt,
|
||||
scheduledRetryAt: promoted.scheduledRetryAt ? new Date(promoted.scheduledRetryAt).toISOString() : null,
|
||||
scheduledRetryReason: promoted.scheduledRetryReason,
|
||||
},
|
||||
});
|
||||
|
||||
publishLiveEvent({
|
||||
companyId: promoted.companyId,
|
||||
type: "heartbeat.run.queued",
|
||||
payload: {
|
||||
runId: promoted.id,
|
||||
agentId: promoted.agentId,
|
||||
invocationSource: promoted.invocationSource,
|
||||
triggerDetail: promoted.triggerDetail,
|
||||
wakeupRequestId: promoted.wakeupRequestId,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
return {
|
||||
promoted: promotedRunIds.length,
|
||||
runIds: promotedRunIds,
|
||||
};
|
||||
}
|
||||
|
||||
function parseHeartbeatPolicy(agent: typeof agents.$inferSelect) {
|
||||
const runtimeConfig = parseObject(agent.runtimeConfig);
|
||||
const heartbeat = parseObject(runtimeConfig.heartbeat);
|
||||
|
|
@ -3133,7 +3444,7 @@ export function heartbeatService(db: Db) {
|
|||
if (issueId) {
|
||||
const dependencyReadiness = await issuesSvc.listDependencyReadiness(run.companyId, [issueId]);
|
||||
const unresolvedBlockerCount = dependencyReadiness.get(issueId)?.unresolvedBlockerCount ?? 0;
|
||||
if (unresolvedBlockerCount > 0) {
|
||||
if (unresolvedBlockerCount > 0 && !allowsBlockedIssueInteractionWake(context)) {
|
||||
logger.debug({ runId: run.id, issueId, unresolvedBlockerCount }, "claimQueuedRun: skipping blocked run");
|
||||
return null;
|
||||
}
|
||||
|
|
@ -3600,7 +3911,7 @@ export function heartbeatService(db: Db) {
|
|||
.where(
|
||||
and(
|
||||
eq(heartbeatRuns.companyId, companyId),
|
||||
inArray(heartbeatRuns.status, [...ACTIVE_HEARTBEAT_RUN_STATUSES]),
|
||||
inArray(heartbeatRuns.status, [...EXECUTION_PATH_HEARTBEAT_RUN_STATUSES]),
|
||||
sql`${heartbeatRuns.contextSnapshot} ->> 'issueId' = ${issueId}`,
|
||||
),
|
||||
)
|
||||
|
|
@ -3666,6 +3977,147 @@ export function heartbeatService(db: Db) {
|
|||
return queued;
|
||||
}
|
||||
|
||||
function formatIssueLinksForComment(relations: Array<{ identifier?: string | null }>) {
|
||||
const identifiers = [
|
||||
...new Set(
|
||||
relations
|
||||
.map((relation) => relation.identifier)
|
||||
.filter((identifier): identifier is string => Boolean(identifier)),
|
||||
),
|
||||
];
|
||||
if (identifiers.length === 0) return "another open issue";
|
||||
return identifiers
|
||||
.slice(0, 5)
|
||||
.map((identifier) => {
|
||||
const prefix = identifier.split("-")[0] || "PAP";
|
||||
return `[${identifier}](/${prefix}/issues/${identifier})`;
|
||||
})
|
||||
.join(", ");
|
||||
}
|
||||
|
||||
async function reconcileUnassignedBlockingIssues() {
|
||||
const candidates = await db
|
||||
.select({
|
||||
id: issues.id,
|
||||
companyId: issues.companyId,
|
||||
identifier: issues.identifier,
|
||||
status: issues.status,
|
||||
createdByAgentId: issues.createdByAgentId,
|
||||
})
|
||||
.from(issueRelations)
|
||||
.innerJoin(issues, eq(issueRelations.issueId, issues.id))
|
||||
.where(
|
||||
and(
|
||||
eq(issueRelations.type, "blocks"),
|
||||
inArray(issues.status, ["todo", "blocked"]),
|
||||
isNull(issues.assigneeAgentId),
|
||||
isNull(issues.assigneeUserId),
|
||||
sql`${issues.createdByAgentId} is not null`,
|
||||
sql`exists (
|
||||
select 1
|
||||
from issues blocked_issue
|
||||
where blocked_issue.id = ${issueRelations.relatedIssueId}
|
||||
and blocked_issue.company_id = ${issues.companyId}
|
||||
and blocked_issue.status not in ('done', 'cancelled')
|
||||
)`,
|
||||
),
|
||||
);
|
||||
|
||||
let assigned = 0;
|
||||
let skipped = 0;
|
||||
const issueIds: string[] = [];
|
||||
const seen = new Set<string>();
|
||||
|
||||
for (const candidate of candidates) {
|
||||
if (seen.has(candidate.id)) continue;
|
||||
seen.add(candidate.id);
|
||||
|
||||
const creatorAgentId = candidate.createdByAgentId;
|
||||
if (!creatorAgentId) {
|
||||
skipped += 1;
|
||||
continue;
|
||||
}
|
||||
const creatorAgent = await getAgent(creatorAgentId);
|
||||
if (
|
||||
!creatorAgent ||
|
||||
creatorAgent.companyId !== candidate.companyId ||
|
||||
creatorAgent.status === "paused" ||
|
||||
creatorAgent.status === "terminated" ||
|
||||
creatorAgent.status === "pending_approval"
|
||||
) {
|
||||
skipped += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
const relations = await issuesSvc.getRelationSummaries(candidate.id);
|
||||
const blockingLinks = formatIssueLinksForComment(relations.blocks);
|
||||
const updated = await issuesSvc.update(candidate.id, {
|
||||
assigneeAgentId: creatorAgent.id,
|
||||
assigneeUserId: null,
|
||||
});
|
||||
if (!updated) {
|
||||
skipped += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
await issuesSvc.addComment(
|
||||
candidate.id,
|
||||
[
|
||||
"## Assigned Orphan Blocker",
|
||||
"",
|
||||
`Paperclip found this issue is blocking ${blockingLinks} but had no assignee, so no heartbeat could pick it up.`,
|
||||
"",
|
||||
"- Assigned it back to the agent that created the blocker.",
|
||||
"- Next action: resolve this blocker or reassign it to the right owner.",
|
||||
].join("\n"),
|
||||
{},
|
||||
);
|
||||
|
||||
await logActivity(db, {
|
||||
companyId: candidate.companyId,
|
||||
actorType: "system",
|
||||
actorId: "system",
|
||||
agentId: null,
|
||||
runId: null,
|
||||
action: "issue.updated",
|
||||
entityType: "issue",
|
||||
entityId: candidate.id,
|
||||
details: {
|
||||
identifier: candidate.identifier,
|
||||
assigneeAgentId: creatorAgent.id,
|
||||
source: "heartbeat.reconcile_unassigned_blocking_issue",
|
||||
},
|
||||
});
|
||||
|
||||
const queued = await enqueueWakeup(creatorAgent.id, {
|
||||
source: "automation",
|
||||
triggerDetail: "system",
|
||||
reason: "issue_assigned",
|
||||
payload: {
|
||||
issueId: candidate.id,
|
||||
mutation: "unassigned_blocker_recovery",
|
||||
},
|
||||
requestedByActorType: "system",
|
||||
requestedByActorId: null,
|
||||
contextSnapshot: {
|
||||
issueId: candidate.id,
|
||||
taskId: candidate.id,
|
||||
wakeReason: "issue_assigned",
|
||||
source: "issue.unassigned_blocker_recovery",
|
||||
},
|
||||
});
|
||||
|
||||
if (queued) {
|
||||
assigned += 1;
|
||||
issueIds.push(candidate.id);
|
||||
} else {
|
||||
skipped += 1;
|
||||
}
|
||||
}
|
||||
|
||||
return { assigned, skipped, issueIds };
|
||||
}
|
||||
|
||||
async function escalateStrandedAssignedIssue(input: {
|
||||
issue: typeof issues.$inferSelect;
|
||||
previousStatus: "todo" | "in_progress";
|
||||
|
|
@ -3720,6 +4172,7 @@ export function heartbeatService(db: Db) {
|
|||
const result = {
|
||||
dispatchRequeued: 0,
|
||||
continuationRequeued: 0,
|
||||
orphanBlockersAssigned: 0,
|
||||
escalated: 0,
|
||||
skipped: 0,
|
||||
issueIds: [] as string[],
|
||||
|
|
@ -3795,7 +4248,6 @@ export function heartbeatService(db: Db) {
|
|||
result.skipped += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (didAutomaticRecoveryFail(latestRun, "issue_continuation_needed")) {
|
||||
const failureSummary = summarizeRunFailureForIssueComment(latestRun);
|
||||
const updated = await escalateStrandedAssignedIssue({
|
||||
|
|
@ -3832,6 +4284,11 @@ export function heartbeatService(db: Db) {
|
|||
}
|
||||
}
|
||||
|
||||
const orphanBlockerRecovery = await reconcileUnassignedBlockingIssues();
|
||||
result.orphanBlockersAssigned = orphanBlockerRecovery.assigned;
|
||||
result.skipped += orphanBlockerRecovery.skipped;
|
||||
result.issueIds.push(...orphanBlockerRecovery.issueIds);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
|
|
@ -3895,7 +4352,7 @@ export function heartbeatService(db: Db) {
|
|||
contextSnapshot: heartbeatRuns.contextSnapshot,
|
||||
})
|
||||
.from(heartbeatRuns)
|
||||
.where(inArray(heartbeatRuns.status, [...ACTIVE_HEARTBEAT_RUN_STATUSES])),
|
||||
.where(inArray(heartbeatRuns.status, [...EXECUTION_PATH_HEARTBEAT_RUN_STATUSES])),
|
||||
db
|
||||
.select({
|
||||
companyId: agentWakeupRequests.companyId,
|
||||
|
|
@ -5209,6 +5666,9 @@ export function heartbeatService(db: Db) {
|
|||
);
|
||||
}
|
||||
}
|
||||
if (outcome === "failed" && livenessRun.errorCode === "codex_transient_upstream") {
|
||||
await scheduleBoundedRetryForRun(livenessRun, agent);
|
||||
}
|
||||
await finalizeIssueCommentPolicy(livenessRun, agent);
|
||||
await releaseIssueExecutionAndPromote(livenessRun);
|
||||
await handleRunLivenessContinuation(livenessRun);
|
||||
|
|
@ -5360,9 +5820,41 @@ export function heartbeatService(db: Db) {
|
|||
}
|
||||
}
|
||||
|
||||
function buildImmediateExecutionPathRecoveryComment(input: {
|
||||
status: "todo" | "in_progress";
|
||||
latestRun: Pick<typeof heartbeatRuns.$inferSelect, "error" | "errorCode"> | null | undefined;
|
||||
}) {
|
||||
const failureSummary = summarizeRunFailureForIssueComment(input.latestRun);
|
||||
if (input.status === "todo") {
|
||||
return (
|
||||
"Paperclip automatically retried dispatch for this assigned `todo` issue during terminal run recovery, " +
|
||||
`but it still has no live execution path.${failureSummary ?? ""} ` +
|
||||
"Moving it to `blocked` so it is visible for intervention."
|
||||
);
|
||||
}
|
||||
|
||||
return (
|
||||
"Paperclip automatically retried continuation for this assigned `in_progress` issue during terminal run " +
|
||||
`recovery, but it still has no live execution path.${failureSummary ?? ""} ` +
|
||||
"Moving it to `blocked` so it is visible for intervention."
|
||||
);
|
||||
}
|
||||
|
||||
async function releaseIssueExecutionAndPromote(run: typeof heartbeatRuns.$inferSelect) {
|
||||
const runContext = parseObject(run.contextSnapshot);
|
||||
const contextIssueId = readNonEmptyString(runContext.issueId);
|
||||
const taskKey = deriveTaskKeyWithHeartbeatFallback(runContext, null);
|
||||
const recoveryAgent = await getAgent(run.agentId);
|
||||
const recoveryAgentInvokable =
|
||||
recoveryAgent &&
|
||||
recoveryAgent.status !== "paused" &&
|
||||
recoveryAgent.status !== "terminated" &&
|
||||
recoveryAgent.status !== "pending_approval";
|
||||
const recoverySessionBefore = recoveryAgentInvokable
|
||||
? await resolveSessionBeforeForWakeup(recoveryAgent, taskKey)
|
||||
: null;
|
||||
const recoveryAgentNameKey = normalizeAgentNameKey(recoveryAgent?.name);
|
||||
|
||||
const promotionResult = await db.transaction(async (tx) => {
|
||||
if (contextIssueId) {
|
||||
await tx.execute(
|
||||
|
|
@ -5380,6 +5872,8 @@ export function heartbeatService(db: Db) {
|
|||
companyId: issues.companyId,
|
||||
identifier: issues.identifier,
|
||||
status: issues.status,
|
||||
assigneeAgentId: issues.assigneeAgentId,
|
||||
assigneeUserId: issues.assigneeUserId,
|
||||
executionRunId: issues.executionRunId,
|
||||
})
|
||||
.from(issues)
|
||||
|
|
@ -5421,7 +5915,7 @@ export function heartbeatService(db: Db) {
|
|||
.limit(1)
|
||||
.then((rows) => rows[0] ?? null);
|
||||
|
||||
if (!deferred) return null;
|
||||
if (!deferred) break;
|
||||
|
||||
const deferredAgent = await tx
|
||||
.select()
|
||||
|
|
@ -5562,16 +6056,165 @@ export function heartbeatService(db: Db) {
|
|||
.where(eq(issues.id, issue.id));
|
||||
|
||||
return {
|
||||
kind: "promoted" as const,
|
||||
run: newRun,
|
||||
reopenedActivity,
|
||||
};
|
||||
}
|
||||
|
||||
const issueNeedsImmediateRecovery =
|
||||
(issue.status === "todo" || issue.status === "in_progress") &&
|
||||
!issue.assigneeUserId &&
|
||||
issue.assigneeAgentId === run.agentId &&
|
||||
(run.status === "failed" || run.status === "timed_out" || run.status === "cancelled");
|
||||
|
||||
if (!issueNeedsImmediateRecovery) {
|
||||
return { kind: "released" as const };
|
||||
}
|
||||
|
||||
const existingExecutionPath = await tx
|
||||
.select({ id: heartbeatRuns.id })
|
||||
.from(heartbeatRuns)
|
||||
.where(
|
||||
and(
|
||||
eq(heartbeatRuns.companyId, issue.companyId),
|
||||
inArray(heartbeatRuns.status, [...EXECUTION_PATH_HEARTBEAT_RUN_STATUSES]),
|
||||
sql`${heartbeatRuns.contextSnapshot} ->> 'issueId' = ${issue.id}`,
|
||||
sql`${heartbeatRuns.id} <> ${run.id}`,
|
||||
),
|
||||
)
|
||||
.limit(1)
|
||||
.then((rows) => rows[0] ?? null);
|
||||
if (existingExecutionPath) {
|
||||
return { kind: "released" as const };
|
||||
}
|
||||
|
||||
const shouldBlockImmediately =
|
||||
!recoveryAgentInvokable ||
|
||||
!recoveryAgent ||
|
||||
didAutomaticRecoveryFail(run, issue.status === "todo" ? "assignment_recovery" : "issue_continuation_needed");
|
||||
if (shouldBlockImmediately) {
|
||||
const comment = buildImmediateExecutionPathRecoveryComment({
|
||||
status: issue.status as "todo" | "in_progress",
|
||||
latestRun: run,
|
||||
});
|
||||
await tx
|
||||
.update(issues)
|
||||
.set({
|
||||
status: "blocked",
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(eq(issues.id, issue.id));
|
||||
return {
|
||||
kind: "blocked" as const,
|
||||
issueId: issue.id,
|
||||
issueIdentifier: issue.identifier,
|
||||
previousStatus: issue.status,
|
||||
comment,
|
||||
};
|
||||
}
|
||||
|
||||
const retryReason = issue.status === "todo" ? "assignment_recovery" : "issue_continuation_needed";
|
||||
const recoveryReason = issue.status === "todo" ? "issue_assignment_recovery" : "issue_continuation_needed";
|
||||
const recoverySource =
|
||||
issue.status === "todo" ? "issue.assignment_recovery" : "issue.continuation_recovery";
|
||||
const now = new Date();
|
||||
const wakeupRequest = await tx
|
||||
.insert(agentWakeupRequests)
|
||||
.values({
|
||||
companyId: issue.companyId,
|
||||
agentId: recoveryAgent.id,
|
||||
source: "automation",
|
||||
triggerDetail: "system",
|
||||
reason: recoveryReason,
|
||||
payload: {
|
||||
issueId: issue.id,
|
||||
retryOfRunId: run.id,
|
||||
},
|
||||
status: "queued",
|
||||
requestedByActorType: "system",
|
||||
requestedByActorId: null,
|
||||
updatedAt: now,
|
||||
})
|
||||
.returning()
|
||||
.then((rows) => rows[0]);
|
||||
|
||||
const queuedRun = await tx
|
||||
.insert(heartbeatRuns)
|
||||
.values({
|
||||
companyId: issue.companyId,
|
||||
agentId: recoveryAgent.id,
|
||||
invocationSource: "automation",
|
||||
triggerDetail: "system",
|
||||
status: "queued",
|
||||
wakeupRequestId: wakeupRequest.id,
|
||||
contextSnapshot: {
|
||||
issueId: issue.id,
|
||||
taskId: issue.id,
|
||||
wakeReason: recoveryReason,
|
||||
retryReason,
|
||||
source: recoverySource,
|
||||
retryOfRunId: run.id,
|
||||
},
|
||||
sessionIdBefore: recoverySessionBefore,
|
||||
retryOfRunId: run.id,
|
||||
updatedAt: now,
|
||||
})
|
||||
.returning()
|
||||
.then((rows) => rows[0]);
|
||||
|
||||
await tx
|
||||
.update(agentWakeupRequests)
|
||||
.set({
|
||||
runId: queuedRun.id,
|
||||
updatedAt: now,
|
||||
})
|
||||
.where(eq(agentWakeupRequests.id, wakeupRequest.id));
|
||||
|
||||
await tx
|
||||
.update(issues)
|
||||
.set({
|
||||
executionRunId: queuedRun.id,
|
||||
executionAgentNameKey: recoveryAgentNameKey,
|
||||
executionLockedAt: now,
|
||||
updatedAt: now,
|
||||
})
|
||||
.where(eq(issues.id, issue.id));
|
||||
|
||||
return {
|
||||
kind: "queued_recovery" as const,
|
||||
run: queuedRun,
|
||||
};
|
||||
});
|
||||
|
||||
if (promotionResult?.kind === "blocked") {
|
||||
await issuesSvc.addComment(promotionResult.issueId, promotionResult.comment, {});
|
||||
await logActivity(db, {
|
||||
companyId: run.companyId,
|
||||
actorType: "system",
|
||||
actorId: "system",
|
||||
agentId: null,
|
||||
runId: run.id,
|
||||
action: "issue.updated",
|
||||
entityType: "issue",
|
||||
entityId: promotionResult.issueId,
|
||||
details: {
|
||||
identifier: promotionResult.issueIdentifier,
|
||||
status: "blocked",
|
||||
previousStatus: promotionResult.previousStatus,
|
||||
source: "heartbeat.release_issue_execution_and_promote",
|
||||
latestRunId: run.id,
|
||||
latestRunStatus: run.status,
|
||||
latestRunErrorCode: run.errorCode ?? null,
|
||||
},
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
const promotedRun = promotionResult?.run ?? null;
|
||||
if (!promotedRun) return;
|
||||
|
||||
if (promotionResult?.reopenedActivity) {
|
||||
if (promotionResult?.kind === "promoted" && promotionResult.reopenedActivity) {
|
||||
await logActivity(db, promotionResult.reopenedActivity);
|
||||
}
|
||||
|
||||
|
|
@ -5737,7 +6380,12 @@ export function heartbeatService(db: Db) {
|
|||
.then((rows) => rows[0] ?? null)
|
||||
: null;
|
||||
|
||||
if (activeExecutionRun && activeExecutionRun.status !== "queued" && activeExecutionRun.status !== "running") {
|
||||
if (
|
||||
activeExecutionRun &&
|
||||
!EXECUTION_PATH_HEARTBEAT_RUN_STATUSES.includes(
|
||||
activeExecutionRun.status as (typeof EXECUTION_PATH_HEARTBEAT_RUN_STATUSES)[number],
|
||||
)
|
||||
) {
|
||||
activeExecutionRun = null;
|
||||
}
|
||||
|
||||
|
|
@ -5760,7 +6408,7 @@ export function heartbeatService(db: Db) {
|
|||
.where(
|
||||
and(
|
||||
eq(heartbeatRuns.companyId, issue.companyId),
|
||||
inArray(heartbeatRuns.status, ["queued", "running"]),
|
||||
inArray(heartbeatRuns.status, [...EXECUTION_PATH_HEARTBEAT_RUN_STATUSES]),
|
||||
sql`${heartbeatRuns.contextSnapshot} ->> 'issueId' = ${issue.id}`,
|
||||
),
|
||||
)
|
||||
|
|
@ -5790,6 +6438,53 @@ export function heartbeatService(db: Db) {
|
|||
}
|
||||
}
|
||||
|
||||
const dependencyReadiness = await issuesSvc.listDependencyReadiness(
|
||||
issue.companyId,
|
||||
[issue.id],
|
||||
tx,
|
||||
).then((rows) => rows.get(issue.id) ?? null);
|
||||
|
||||
// Blocked descendants should stay idle until the final blocker resolves.
|
||||
// Human comment/mention wakes are the exception: they may run in a
|
||||
// bounded interaction mode so the assignee can answer or triage.
|
||||
const blockedInteractionWake =
|
||||
dependencyReadiness &&
|
||||
!dependencyReadiness.isDependencyReady &&
|
||||
allowsBlockedIssueInteractionWake(enrichedContextSnapshot);
|
||||
|
||||
if (blockedInteractionWake) {
|
||||
enrichedContextSnapshot.dependencyBlockedInteraction = true;
|
||||
enrichedContextSnapshot.unresolvedBlockerIssueIds = dependencyReadiness.unresolvedBlockerIssueIds;
|
||||
enrichedContextSnapshot.unresolvedBlockerCount = dependencyReadiness.unresolvedBlockerCount;
|
||||
enrichedContextSnapshot.unresolvedBlockerSummaries = await listUnresolvedBlockerSummaries(
|
||||
tx,
|
||||
issue.companyId,
|
||||
issue.id,
|
||||
dependencyReadiness.unresolvedBlockerIssueIds,
|
||||
);
|
||||
}
|
||||
|
||||
if (!activeExecutionRun && dependencyReadiness && !dependencyReadiness.isDependencyReady && !blockedInteractionWake) {
|
||||
await tx.insert(agentWakeupRequests).values({
|
||||
companyId: agent.companyId,
|
||||
agentId,
|
||||
source,
|
||||
triggerDetail,
|
||||
reason: "issue_dependencies_blocked",
|
||||
payload: {
|
||||
...(payload ?? {}),
|
||||
issueId,
|
||||
unresolvedBlockerIssueIds: dependencyReadiness.unresolvedBlockerIssueIds,
|
||||
},
|
||||
status: "skipped",
|
||||
requestedByActorType: opts.requestedByActorType ?? null,
|
||||
requestedByActorId: opts.requestedByActorId ?? null,
|
||||
idempotencyKey: opts.idempotencyKey ?? null,
|
||||
finishedAt: new Date(),
|
||||
});
|
||||
return { kind: "skipped" as const };
|
||||
}
|
||||
|
||||
if (activeExecutionRun) {
|
||||
const executionAgent = await tx
|
||||
.select({ name: agents.name })
|
||||
|
|
@ -5977,12 +6672,15 @@ export function heartbeatService(db: Db) {
|
|||
const activeRuns = await db
|
||||
.select()
|
||||
.from(heartbeatRuns)
|
||||
.where(and(eq(heartbeatRuns.agentId, agentId), inArray(heartbeatRuns.status, ["queued", "running"])))
|
||||
.where(and(eq(heartbeatRuns.agentId, agentId), inArray(heartbeatRuns.status, [...EXECUTION_PATH_HEARTBEAT_RUN_STATUSES])))
|
||||
.orderBy(desc(heartbeatRuns.createdAt));
|
||||
|
||||
const sameScopeQueuedRun = activeRuns.find(
|
||||
(candidate) => candidate.status === "queued" && isSameTaskScope(runTaskKey(candidate), taskKey),
|
||||
);
|
||||
const sameScopeScheduledRetryRun = activeRuns.find(
|
||||
(candidate) => candidate.status === "scheduled_retry" && isSameTaskScope(runTaskKey(candidate), taskKey),
|
||||
);
|
||||
const sameScopeRunningRun = activeRuns.find(
|
||||
(candidate) => candidate.status === "running" && isSameTaskScope(runTaskKey(candidate), taskKey),
|
||||
);
|
||||
|
|
@ -5993,6 +6691,7 @@ export function heartbeatService(db: Db) {
|
|||
|
||||
const coalescedTargetRun =
|
||||
sameScopeQueuedRun ??
|
||||
sameScopeScheduledRetryRun ??
|
||||
(shouldQueueFollowupForRunningWake ? null : sameScopeRunningRun ?? null);
|
||||
|
||||
if (coalescedTargetRun) {
|
||||
|
|
@ -6103,7 +6802,7 @@ export function heartbeatService(db: Db) {
|
|||
.where(
|
||||
and(
|
||||
eq(heartbeatRuns.companyId, companyId),
|
||||
inArray(heartbeatRuns.status, ["queued", "running"]),
|
||||
inArray(heartbeatRuns.status, [...CANCELLABLE_HEARTBEAT_RUN_STATUSES]),
|
||||
sql`${effectiveProjectId} = ${projectId}`,
|
||||
),
|
||||
);
|
||||
|
|
@ -6188,7 +6887,7 @@ export function heartbeatService(db: Db) {
|
|||
async function cancelRunInternal(runId: string, reason = "Cancelled by control plane") {
|
||||
const run = await getRun(runId);
|
||||
if (!run) throw notFound("Heartbeat run not found");
|
||||
if (run.status !== "running" && run.status !== "queued") return run;
|
||||
if (!CANCELLABLE_HEARTBEAT_RUN_STATUSES.includes(run.status as (typeof CANCELLABLE_HEARTBEAT_RUN_STATUSES)[number])) return run;
|
||||
const agent = await getAgent(run.agentId);
|
||||
|
||||
const running = runningProcesses.get(run.id);
|
||||
|
|
@ -6244,7 +6943,7 @@ export function heartbeatService(db: Db) {
|
|||
const runs = await db
|
||||
.select()
|
||||
.from(heartbeatRuns)
|
||||
.where(and(eq(heartbeatRuns.agentId, agentId), inArray(heartbeatRuns.status, ["queued", "running"])));
|
||||
.where(and(eq(heartbeatRuns.agentId, agentId), inArray(heartbeatRuns.status, [...CANCELLABLE_HEARTBEAT_RUN_STATUSES])));
|
||||
|
||||
for (const run of runs) {
|
||||
await setRunStatus(run.id, "cancelled", {
|
||||
|
|
@ -6300,7 +6999,7 @@ export function heartbeatService(db: Db) {
|
|||
.where(
|
||||
and(
|
||||
eq(heartbeatRuns.companyId, scope.companyId),
|
||||
inArray(heartbeatRuns.status, ["queued", "running"]),
|
||||
inArray(heartbeatRuns.status, [...CANCELLABLE_HEARTBEAT_RUN_STATUSES]),
|
||||
),
|
||||
)
|
||||
.then((rows) => rows.map((row) => row.id))
|
||||
|
|
@ -6471,6 +7170,25 @@ export function heartbeatService(db: Db) {
|
|||
.orderBy(asc(heartbeatRunEvents.seq))
|
||||
.limit(Math.max(1, Math.min(limit, 1000))),
|
||||
|
||||
getRetryExhaustedReason: async (runId: string) => {
|
||||
const row = await db
|
||||
.select({
|
||||
message: heartbeatRunEvents.message,
|
||||
})
|
||||
.from(heartbeatRunEvents)
|
||||
.where(
|
||||
and(
|
||||
eq(heartbeatRunEvents.runId, runId),
|
||||
eq(heartbeatRunEvents.eventType, "lifecycle"),
|
||||
sql`${heartbeatRunEvents.message} like 'Bounded retry exhausted%'`,
|
||||
),
|
||||
)
|
||||
.orderBy(desc(heartbeatRunEvents.id))
|
||||
.limit(1)
|
||||
.then((rows) => rows[0] ?? null);
|
||||
return row?.message ?? null;
|
||||
},
|
||||
|
||||
readLog: async (
|
||||
runOrLookup: string | {
|
||||
id: string;
|
||||
|
|
@ -6525,8 +7243,26 @@ export function heartbeatService(db: Db) {
|
|||
|
||||
reapOrphanedRuns,
|
||||
|
||||
promoteDueScheduledRetries,
|
||||
|
||||
resumeQueuedRuns,
|
||||
|
||||
scheduleBoundedRetry: async (
|
||||
runId: string,
|
||||
opts?: {
|
||||
now?: Date;
|
||||
random?: () => number;
|
||||
retryReason?: string;
|
||||
wakeReason?: string;
|
||||
},
|
||||
) => {
|
||||
const run = await getRun(runId, { unsafeFullResultJson: true });
|
||||
if (!run) return { outcome: "missing_run" as const };
|
||||
const agent = await getAgent(run.agentId);
|
||||
if (!agent) return { outcome: "missing_agent" as const };
|
||||
return scheduleBoundedRetryForRun(run, agent, opts);
|
||||
},
|
||||
|
||||
reconcileStrandedAssignedIssues,
|
||||
|
||||
reconcileIssueGraphLiveness,
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
import { Buffer } from "node:buffer";
|
||||
import { and, asc, desc, eq, inArray, isNull, ne, or, sql } from "drizzle-orm";
|
||||
import type { Db } from "@paperclipai/db";
|
||||
import {
|
||||
|
|
@ -79,6 +80,7 @@ export interface IssueFilters {
|
|||
inboxArchivedByUserId?: string;
|
||||
unreadForUserId?: string;
|
||||
projectId?: string;
|
||||
workspaceId?: string;
|
||||
executionWorkspaceId?: string;
|
||||
parentId?: string;
|
||||
labelId?: string;
|
||||
|
|
@ -168,6 +170,7 @@ function sameRunLock(checkoutRunId: string | null, actorRunId: string | null) {
|
|||
|
||||
const TERMINAL_HEARTBEAT_RUN_STATUSES = new Set(["succeeded", "failed", "cancelled", "timed_out"]);
|
||||
const ISSUE_LIST_DESCRIPTION_MAX_CHARS = 1200;
|
||||
const ISSUE_LIST_DESCRIPTION_MAX_BYTES = ISSUE_LIST_DESCRIPTION_MAX_CHARS * 4;
|
||||
|
||||
function escapeLikePattern(value: string): string {
|
||||
return value.replace(/[\\%_]/g, "\\$&");
|
||||
|
|
@ -191,6 +194,16 @@ function truncateInlineSummary(value: string | null | undefined, maxChars = CHIL
|
|||
return normalized.length > maxChars ? `${normalized.slice(0, Math.max(0, maxChars - 15)).trimEnd()} [truncated]` : normalized;
|
||||
}
|
||||
|
||||
function truncateByCodePoint(value: string, maxChars: number): string {
|
||||
if (value.length <= maxChars) return value;
|
||||
return Array.from(value).slice(0, maxChars).join("");
|
||||
}
|
||||
|
||||
function decodeDatabaseTextPreview(value: string | null | undefined, maxChars: number): string | null {
|
||||
if (value == null) return null;
|
||||
return truncateByCodePoint(Buffer.from(value, "base64").toString("utf8"), maxChars);
|
||||
}
|
||||
|
||||
function appendAcceptanceCriteriaToDescription(description: string | null | undefined, acceptanceCriteria: string[] | undefined) {
|
||||
const criteria = (acceptanceCriteria ?? []).map((item) => item.trim()).filter(Boolean);
|
||||
if (criteria.length === 0) return description ?? null;
|
||||
|
|
@ -275,7 +288,6 @@ async function listUnresolvedBlockerIssueIds(
|
|||
)
|
||||
.then((rows) => rows.map((row) => row.id));
|
||||
}
|
||||
|
||||
async function getProjectDefaultGoalId(
|
||||
db: ProjectGoalReader,
|
||||
companyId: string,
|
||||
|
|
@ -681,7 +693,13 @@ const issueListSelect = {
|
|||
description: sql<string | null>`
|
||||
CASE
|
||||
WHEN ${issues.description} IS NULL THEN NULL
|
||||
ELSE substring(${issues.description} FROM 1 FOR ${ISSUE_LIST_DESCRIPTION_MAX_CHARS})
|
||||
ELSE encode(
|
||||
substring(
|
||||
convert_to(${issues.description}, current_setting('server_encoding'))
|
||||
FROM 1 FOR ${ISSUE_LIST_DESCRIPTION_MAX_BYTES}
|
||||
),
|
||||
'base64'
|
||||
)
|
||||
END
|
||||
`,
|
||||
status: issues.status,
|
||||
|
|
@ -699,6 +717,7 @@ const issueListSelect = {
|
|||
originKind: issues.originKind,
|
||||
originId: issues.originId,
|
||||
originRunId: issues.originRunId,
|
||||
originFingerprint: issues.originFingerprint,
|
||||
requestDepth: issues.requestDepth,
|
||||
billingCode: issues.billingCode,
|
||||
assigneeAdapterOverrides: issues.assigneeAdapterOverrides,
|
||||
|
|
@ -1275,6 +1294,12 @@ export function issueService(db: Db) {
|
|||
conditions.push(unreadForUserCondition(companyId, unreadForUserId));
|
||||
}
|
||||
if (filters?.projectId) conditions.push(eq(issues.projectId, filters.projectId));
|
||||
if (filters?.workspaceId) {
|
||||
conditions.push(or(
|
||||
eq(issues.executionWorkspaceId, filters.workspaceId),
|
||||
eq(issues.projectWorkspaceId, filters.workspaceId),
|
||||
)!);
|
||||
}
|
||||
if (filters?.executionWorkspaceId) {
|
||||
conditions.push(eq(issues.executionWorkspaceId, filters.executionWorkspaceId));
|
||||
}
|
||||
|
|
@ -1327,7 +1352,10 @@ export function issueService(db: Db) {
|
|||
desc(canonicalLastActivityAt),
|
||||
desc(issues.updatedAt),
|
||||
);
|
||||
const rows = limit === undefined ? await baseQuery : await baseQuery.limit(limit);
|
||||
const rows = (limit === undefined ? await baseQuery : await baseQuery.limit(limit)).map((row) => ({
|
||||
...row,
|
||||
description: decodeDatabaseTextPreview(row.description, ISSUE_LIST_DESCRIPTION_MAX_CHARS),
|
||||
}));
|
||||
const withLabels = await withIssueLabels(db, rows);
|
||||
const runMap = await activeRunMapForIssues(db, withLabels);
|
||||
const withRuns = withActiveRuns(withLabels, runMap);
|
||||
|
|
|
|||
|
|
@ -47,7 +47,7 @@ import { queueIssueAssignmentWakeup, type IssueAssignmentWakeupDeps } from "./is
|
|||
import { logActivity } from "./activity-log.js";
|
||||
|
||||
const OPEN_ISSUE_STATUSES = ["backlog", "todo", "in_progress", "in_review", "blocked"];
|
||||
const LIVE_HEARTBEAT_RUN_STATUSES = ["queued", "running"];
|
||||
const LIVE_HEARTBEAT_RUN_STATUSES = ["queued", "running", "scheduled_retry"];
|
||||
const TERMINAL_ISSUE_STATUSES = new Set(["done", "cancelled"]);
|
||||
const MAX_CATCH_UP_RUNS = 25;
|
||||
const WEEKDAY_INDEX: Record<string, number> = {
|
||||
|
|
@ -320,6 +320,37 @@ function mergeRoutineRunPayload(
|
|||
};
|
||||
}
|
||||
|
||||
function normalizeRoutineDispatchFingerprintValue(value: unknown): unknown {
|
||||
if (value === undefined) return null;
|
||||
if (value == null || typeof value === "string" || typeof value === "number" || typeof value === "boolean") {
|
||||
return value;
|
||||
}
|
||||
if (value instanceof Date) return value.toISOString();
|
||||
if (Array.isArray(value)) return value.map((item) => normalizeRoutineDispatchFingerprintValue(item));
|
||||
if (isPlainRecord(value)) {
|
||||
return Object.fromEntries(
|
||||
Object.keys(value)
|
||||
.sort()
|
||||
.map((key) => [key, normalizeRoutineDispatchFingerprintValue(value[key])]),
|
||||
);
|
||||
}
|
||||
return String(value);
|
||||
}
|
||||
|
||||
function createRoutineDispatchFingerprint(input: {
|
||||
payload: Record<string, unknown> | null;
|
||||
projectId: string | null;
|
||||
assigneeAgentId: string | null;
|
||||
executionWorkspaceId?: string | null;
|
||||
executionWorkspacePreference?: string | null;
|
||||
executionWorkspaceSettings?: Record<string, unknown> | null;
|
||||
title: string;
|
||||
description: string | null;
|
||||
}) {
|
||||
const canonical = JSON.stringify(normalizeRoutineDispatchFingerprintValue(input));
|
||||
return crypto.createHash("sha256").update(canonical).digest("hex");
|
||||
}
|
||||
|
||||
function routineUsesWorkspaceBranch(routine: typeof routines.$inferSelect) {
|
||||
return (routine.variables ?? []).some((variable) => variable.name === WORKSPACE_BRANCH_ROUTINE_VARIABLE)
|
||||
|| extractRoutineVariableNames([routine.title, routine.description]).includes(WORKSPACE_BRANCH_ROUTINE_VARIABLE);
|
||||
|
|
@ -426,6 +457,7 @@ export function routineService(db: Db, deps: { heartbeat?: IssueAssignmentWakeup
|
|||
triggeredAt: routineRuns.triggeredAt,
|
||||
idempotencyKey: routineRuns.idempotencyKey,
|
||||
triggerPayload: routineRuns.triggerPayload,
|
||||
dispatchFingerprint: routineRuns.dispatchFingerprint,
|
||||
linkedIssueId: routineRuns.linkedIssueId,
|
||||
coalescedIntoRunId: routineRuns.coalescedIntoRunId,
|
||||
failureReason: routineRuns.failureReason,
|
||||
|
|
@ -458,6 +490,7 @@ export function routineService(db: Db, deps: { heartbeat?: IssueAssignmentWakeup
|
|||
triggeredAt: row.triggeredAt,
|
||||
idempotencyKey: row.idempotencyKey,
|
||||
triggerPayload: row.triggerPayload as Record<string, unknown> | null,
|
||||
dispatchFingerprint: row.dispatchFingerprint,
|
||||
linkedIssueId: row.linkedIssueId,
|
||||
coalescedIntoRunId: row.coalescedIntoRunId,
|
||||
failureReason: row.failureReason,
|
||||
|
|
@ -606,7 +639,22 @@ export function routineService(db: Db, deps: { heartbeat?: IssueAssignmentWakeup
|
|||
}
|
||||
}
|
||||
|
||||
async function findLiveExecutionIssue(routine: typeof routines.$inferSelect, executor: Db = db) {
|
||||
function routineExecutionFingerprintCondition(dispatchFingerprint?: string | null) {
|
||||
if (!dispatchFingerprint) return null;
|
||||
// The "default" arm preserves coalescing against pre-migration open issues.
|
||||
// It becomes inert once those legacy routine execution issues drain out.
|
||||
return or(
|
||||
eq(issues.originFingerprint, dispatchFingerprint),
|
||||
eq(issues.originFingerprint, "default"),
|
||||
);
|
||||
}
|
||||
|
||||
async function findLiveExecutionIssue(
|
||||
routine: typeof routines.$inferSelect,
|
||||
executor: Db = db,
|
||||
dispatchFingerprint?: string | null,
|
||||
) {
|
||||
const fingerprintCondition = routineExecutionFingerprintCondition(dispatchFingerprint);
|
||||
const executionBoundIssue = await executor
|
||||
.select()
|
||||
.from(issues)
|
||||
|
|
@ -624,6 +672,7 @@ export function routineService(db: Db, deps: { heartbeat?: IssueAssignmentWakeup
|
|||
eq(issues.originId, routine.id),
|
||||
inArray(issues.status, OPEN_ISSUE_STATUSES),
|
||||
isNull(issues.hiddenAt),
|
||||
...(fingerprintCondition ? [fingerprintCondition] : []),
|
||||
),
|
||||
)
|
||||
.orderBy(desc(issues.updatedAt), desc(issues.createdAt))
|
||||
|
|
@ -649,6 +698,7 @@ export function routineService(db: Db, deps: { heartbeat?: IssueAssignmentWakeup
|
|||
eq(issues.originId, routine.id),
|
||||
inArray(issues.status, OPEN_ISSUE_STATUSES),
|
||||
isNull(issues.hiddenAt),
|
||||
...(fingerprintCondition ? [fingerprintCondition] : []),
|
||||
),
|
||||
)
|
||||
.orderBy(desc(issues.updatedAt), desc(issues.createdAt))
|
||||
|
|
@ -745,6 +795,16 @@ export function routineService(db: Db, deps: { heartbeat?: IssueAssignmentWakeup
|
|||
const title = interpolateRoutineTemplate(input.routine.title, allVariables) ?? input.routine.title;
|
||||
const description = interpolateRoutineTemplate(input.routine.description, allVariables);
|
||||
const triggerPayload = mergeRoutineRunPayload(input.payload, { ...automaticVariables, ...resolvedVariables });
|
||||
const dispatchFingerprint = createRoutineDispatchFingerprint({
|
||||
payload: triggerPayload,
|
||||
projectId,
|
||||
assigneeAgentId,
|
||||
executionWorkspaceId: input.executionWorkspaceId ?? null,
|
||||
executionWorkspacePreference: input.executionWorkspacePreference ?? null,
|
||||
executionWorkspaceSettings: input.executionWorkspaceSettings ?? null,
|
||||
title,
|
||||
description,
|
||||
});
|
||||
const run = await db.transaction(async (tx) => {
|
||||
const txDb = tx as unknown as Db;
|
||||
await tx.execute(
|
||||
|
|
@ -782,6 +842,7 @@ export function routineService(db: Db, deps: { heartbeat?: IssueAssignmentWakeup
|
|||
triggeredAt,
|
||||
idempotencyKey: input.idempotencyKey ?? null,
|
||||
triggerPayload,
|
||||
dispatchFingerprint,
|
||||
})
|
||||
.returning();
|
||||
|
||||
|
|
@ -791,7 +852,7 @@ export function routineService(db: Db, deps: { heartbeat?: IssueAssignmentWakeup
|
|||
|
||||
let createdIssue: Awaited<ReturnType<typeof issueSvc.create>> | null = null;
|
||||
try {
|
||||
const activeIssue = await findLiveExecutionIssue(input.routine, txDb);
|
||||
const activeIssue = await findLiveExecutionIssue(input.routine, txDb, dispatchFingerprint);
|
||||
if (activeIssue && input.routine.concurrencyPolicy !== "always_enqueue") {
|
||||
const status = input.routine.concurrencyPolicy === "skip_if_active" ? "skipped" : "coalesced";
|
||||
const updated = await finalizeRun(createdRun.id, {
|
||||
|
|
@ -824,6 +885,7 @@ export function routineService(db: Db, deps: { heartbeat?: IssueAssignmentWakeup
|
|||
originKind: "routine_execution",
|
||||
originId: input.routine.id,
|
||||
originRunId: createdRun.id,
|
||||
originFingerprint: dispatchFingerprint,
|
||||
executionWorkspaceId: input.executionWorkspaceId ?? null,
|
||||
executionWorkspacePreference: input.executionWorkspacePreference ?? null,
|
||||
executionWorkspaceSettings: input.executionWorkspaceSettings ?? null,
|
||||
|
|
@ -840,7 +902,7 @@ export function routineService(db: Db, deps: { heartbeat?: IssueAssignmentWakeup
|
|||
throw error;
|
||||
}
|
||||
|
||||
const existingIssue = await findLiveExecutionIssue(input.routine, txDb);
|
||||
const existingIssue = await findLiveExecutionIssue(input.routine, txDb, dispatchFingerprint);
|
||||
if (!existingIssue) throw error;
|
||||
const status = input.routine.concurrencyPolicy === "skip_if_active" ? "skipped" : "coalesced";
|
||||
const updated = await finalizeRun(createdRun.id, {
|
||||
|
|
@ -994,6 +1056,7 @@ export function routineService(db: Db, deps: { heartbeat?: IssueAssignmentWakeup
|
|||
triggeredAt: routineRuns.triggeredAt,
|
||||
idempotencyKey: routineRuns.idempotencyKey,
|
||||
triggerPayload: routineRuns.triggerPayload,
|
||||
dispatchFingerprint: routineRuns.dispatchFingerprint,
|
||||
linkedIssueId: routineRuns.linkedIssueId,
|
||||
coalescedIntoRunId: routineRuns.coalescedIntoRunId,
|
||||
failureReason: routineRuns.failureReason,
|
||||
|
|
@ -1025,6 +1088,7 @@ export function routineService(db: Db, deps: { heartbeat?: IssueAssignmentWakeup
|
|||
triggeredAt: run.triggeredAt,
|
||||
idempotencyKey: run.idempotencyKey,
|
||||
triggerPayload: run.triggerPayload as Record<string, unknown> | null,
|
||||
dispatchFingerprint: run.dispatchFingerprint,
|
||||
linkedIssueId: run.linkedIssueId,
|
||||
coalescedIntoRunId: run.coalescedIntoRunId,
|
||||
failureReason: run.failureReason,
|
||||
|
|
@ -1437,6 +1501,7 @@ export function routineService(db: Db, deps: { heartbeat?: IssueAssignmentWakeup
|
|||
triggeredAt: routineRuns.triggeredAt,
|
||||
idempotencyKey: routineRuns.idempotencyKey,
|
||||
triggerPayload: routineRuns.triggerPayload,
|
||||
dispatchFingerprint: routineRuns.dispatchFingerprint,
|
||||
linkedIssueId: routineRuns.linkedIssueId,
|
||||
coalescedIntoRunId: routineRuns.coalescedIntoRunId,
|
||||
failureReason: routineRuns.failureReason,
|
||||
|
|
@ -1468,6 +1533,7 @@ export function routineService(db: Db, deps: { heartbeat?: IssueAssignmentWakeup
|
|||
triggeredAt: row.triggeredAt,
|
||||
idempotencyKey: row.idempotencyKey,
|
||||
triggerPayload: row.triggerPayload as Record<string, unknown> | null,
|
||||
dispatchFingerprint: row.dispatchFingerprint,
|
||||
linkedIssueId: row.linkedIssueId,
|
||||
coalescedIntoRunId: row.coalescedIntoRunId,
|
||||
failureReason: row.failureReason,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue