fix: harden heartbeat and adapter runtime workflows

This commit is contained in:
Dotta 2026-04-10 22:26:21 -05:00
parent 548721248e
commit c566a9236c
48 changed files with 14922 additions and 600 deletions

View file

@ -13,6 +13,34 @@ function readCommentText(value: unknown) {
return trimmed.length > 0 ? trimmed : null;
}
export function mergeHeartbeatRunResultJson(
resultJson: Record<string, unknown> | null | undefined,
summary: string | null | undefined,
): Record<string, unknown> | null {
const normalizedSummary = readCommentText(summary);
const baseResult =
resultJson && typeof resultJson === "object" && !Array.isArray(resultJson)
? resultJson
: null;
if (!baseResult) {
return normalizedSummary ? { summary: normalizedSummary } : null;
}
if (!normalizedSummary) {
return baseResult;
}
if (readCommentText(baseResult.summary)) {
return baseResult;
}
return {
...baseResult,
summary: normalizedSummary,
};
}
export function summarizeHeartbeatRunResultJson(
resultJson: Record<string, unknown> | null | undefined,
): Record<string, unknown> | null {

View file

@ -32,7 +32,11 @@ import { companySkillService } from "./company-skills.js";
import { budgetService, type BudgetEnforcementScope } from "./budgets.js";
import { secretService } from "./secrets.js";
import { resolveDefaultAgentWorkspaceDir, resolveManagedProjectWorkspaceDir } from "../home-paths.js";
import { buildHeartbeatRunIssueComment, summarizeHeartbeatRunResultJson } from "./heartbeat-run-summary.js";
import {
buildHeartbeatRunIssueComment,
mergeHeartbeatRunResultJson,
summarizeHeartbeatRunResultJson,
} from "./heartbeat-run-summary.js";
import {
buildWorkspaceReadyComment,
cleanupExecutionWorkspaceArtifacts,
@ -47,6 +51,7 @@ import {
import { issueService } from "./issues.js";
import { executionWorkspaceService, mergeExecutionWorkspaceConfig } from "./execution-workspaces.js";
import { workspaceOperationService } from "./workspace-operations.js";
import { isProcessGroupAlive, terminateLocalService } from "./local-service-supervisor.js";
import {
buildExecutionWorkspaceAdapterConfig,
gateProjectExecutionWorkspacePolicy,
@ -261,6 +266,9 @@ async function ensureManagedProjectWorkspace(input: {
}
}
const heartbeatRunProcessGroupIdColumn =
heartbeatRuns.processGroupId ?? sql<number | null>`NULL`.as("processGroupId");
const heartbeatRunListColumns = {
id: heartbeatRuns.id,
companyId: heartbeatRuns.companyId,
@ -288,6 +296,7 @@ const heartbeatRunListColumns = {
errorCode: heartbeatRuns.errorCode,
externalRunId: heartbeatRuns.externalRunId,
processPid: heartbeatRuns.processPid,
processGroupId: heartbeatRunProcessGroupIdColumn,
processStartedAt: heartbeatRuns.processStartedAt,
retryOfRunId: heartbeatRuns.retryOfRunId,
processLossRetryCount: heartbeatRuns.processLossRetryCount,
@ -296,6 +305,18 @@ const heartbeatRunListColumns = {
updatedAt: heartbeatRuns.updatedAt,
} as const;
const heartbeatRunIssueSummaryColumns = {
id: heartbeatRuns.id,
status: heartbeatRuns.status,
invocationSource: heartbeatRuns.invocationSource,
triggerDetail: heartbeatRuns.triggerDetail,
startedAt: heartbeatRuns.startedAt,
finishedAt: heartbeatRuns.finishedAt,
createdAt: heartbeatRuns.createdAt,
agentId: heartbeatRuns.agentId,
issueId: sql<string | null>`${heartbeatRuns.contextSnapshot} ->> 'issueId'`.as("issueId"),
} as const;
function appendExcerpt(prev: string, chunk: string) {
return appendWithCap(prev, chunk, MAX_EXCERPT_BYTES);
}
@ -1026,6 +1047,46 @@ function isProcessAlive(pid: number | null | undefined) {
}
}
async function terminateHeartbeatRunProcess(input: {
pid: number | null | undefined;
processGroupId: number | null | undefined;
graceMs?: number;
}) {
const pid = input.pid ?? null;
const processGroupId = input.processGroupId ?? null;
if (typeof pid !== "number" && typeof processGroupId !== "number") return;
await terminateLocalService(
{
pid:
typeof pid === "number" && Number.isInteger(pid) && pid > 0
? pid
: (processGroupId ?? 0),
processGroupId:
typeof processGroupId === "number" && Number.isInteger(processGroupId) && processGroupId > 0
? processGroupId
: null,
},
input.graceMs ? { forceAfterMs: input.graceMs } : undefined,
);
}
function buildProcessLossMessage(run: {
processPid: number | null;
processGroupId: number | null;
}, options?: { descendantOnly?: boolean }) {
if (options?.descendantOnly && run.processGroupId) {
return `Process lost -- parent pid ${run.processPid ?? "unknown"} exited, but descendant process group ${run.processGroupId} was still alive and was terminated`;
}
if (run.processPid) {
return `Process lost -- child pid ${run.processPid} is no longer running`;
}
if (run.processGroupId) {
return `Process lost -- process group ${run.processGroupId} is no longer running`;
}
return "Process lost -- server may have restarted";
}
function truncateDisplayId(value: string | null | undefined, max = 128) {
if (!value) return null;
return value.length > max ? value.slice(0, max) : value;
@ -1824,13 +1885,14 @@ export function heartbeatService(db: Db) {
async function persistRunProcessMetadata(
runId: string,
meta: { pid: number; startedAt: string },
meta: { pid: number; processGroupId: number | null; startedAt: string },
) {
const startedAt = new Date(meta.startedAt);
return db
.update(heartbeatRuns)
.set({
processPid: meta.pid,
processGroupId: meta.processGroupId,
processStartedAt: Number.isNaN(startedAt.getTime()) ? new Date() : startedAt,
updatedAt: new Date(),
})
@ -2356,7 +2418,9 @@ export function heartbeatService(db: Db) {
}
const tracksLocalChild = isTrackedLocalChildProcessAdapter(adapterType);
if (tracksLocalChild && run.processPid && isProcessAlive(run.processPid)) {
const processPidAlive = tracksLocalChild && run.processPid && isProcessAlive(run.processPid);
const processGroupAlive = tracksLocalChild && run.processGroupId && isProcessGroupAlive(run.processGroupId);
if (processPidAlive) {
if (run.errorCode !== DETACHED_PROCESS_ERROR_CODE) {
const detachedMessage = `Lost in-memory process handle, but child pid ${run.processPid} is still alive`;
const detachedRun = await setRunStatus(run.id, "running", {
@ -2378,10 +2442,17 @@ export function heartbeatService(db: Db) {
continue;
}
const shouldRetry = tracksLocalChild && !!run.processPid && (run.processLossRetryCount ?? 0) < 1;
const baseMessage = run.processPid
? `Process lost -- child pid ${run.processPid} is no longer running`
: "Process lost -- server may have restarted";
let descendantOnlyCleanup = false;
if (processGroupAlive) {
descendantOnlyCleanup = true;
await terminateHeartbeatRunProcess({
pid: run.processPid,
processGroupId: run.processGroupId,
});
}
const shouldRetry = tracksLocalChild && (!!run.processPid || !!run.processGroupId) && (run.processLossRetryCount ?? 0) < 1;
const baseMessage = buildProcessLossMessage(run, descendantOnlyCleanup ? { descendantOnly: true } : undefined);
let finalizedRun = await setRunStatus(run.id, "failed", {
error: shouldRetry ? `${baseMessage}; retrying once` : baseMessage,
@ -2414,6 +2485,8 @@ export function heartbeatService(db: Db) {
: baseMessage,
payload: {
...(run.processPid ? { processPid: run.processPid } : {}),
...(run.processGroupId ? { processGroupId: run.processGroupId } : {}),
...(descendantOnlyCleanup ? { descendantOnlyCleanup: true } : {}),
...(retriedRun ? { retryRunId: retriedRun.id } : {}),
},
});
@ -3179,7 +3252,14 @@ export function heartbeatService(db: Db) {
onLog,
onMeta: onAdapterMeta,
onSpawn: async (meta) => {
await persistRunProcessMetadata(run.id, meta);
await persistRunProcessMetadata(run.id, {
pid: meta.pid,
processGroupId:
"processGroupId" in meta && typeof meta.processGroupId === "number"
? meta.processGroupId
: null,
startedAt: meta.startedAt,
});
},
authToken: authToken ?? undefined,
});
@ -3299,6 +3379,11 @@ export function heartbeatService(db: Db) {
} as Record<string, unknown>)
: null;
const persistedResultJson = mergeHeartbeatRunResultJson(
adapterResult.resultJson ?? null,
adapterResult.summary ?? null,
);
await setRunStatus(run.id, status, {
finishedAt: new Date(),
error:
@ -3319,7 +3404,7 @@ export function heartbeatService(db: Db) {
exitCode: adapterResult.exitCode,
signal: adapterResult.signal,
usageJson,
resultJson: adapterResult.resultJson ?? null,
resultJson: persistedResultJson,
sessionIdAfter: nextSessionState.displayId ?? nextSessionState.legacySessionId,
stdoutExcerpt,
stderrExcerpt,
@ -3347,7 +3432,7 @@ export function heartbeatService(db: Db) {
});
if (issueId && outcome === "succeeded") {
try {
const issueComment = buildHeartbeatRunIssueComment(adapterResult.resultJson ?? null);
const issueComment = buildHeartbeatRunIssueComment(persistedResultJson);
if (issueComment) {
await issuesSvc.addComment(issueId, issueComment, { agentId: agent.id, runId: finalizedRun.id });
}
@ -4242,13 +4327,16 @@ export function heartbeatService(db: Db) {
const running = runningProcesses.get(run.id);
if (running) {
running.child.kill("SIGTERM");
const graceMs = Math.max(1, running.graceSec) * 1000;
setTimeout(() => {
if (!running.child.killed) {
running.child.kill("SIGKILL");
}
}, graceMs);
await terminateHeartbeatRunProcess({
pid: running.child.pid ?? run.processPid,
processGroupId: running.processGroupId ?? run.processGroupId,
graceMs: Math.max(1, running.graceSec) * 1000,
});
} else if (run.processPid || run.processGroupId) {
await terminateHeartbeatRunProcess({
pid: run.processPid,
processGroupId: run.processGroupId,
});
}
const cancelled = await setRunStatus(run.id, "cancelled", {
@ -4298,8 +4386,17 @@ export function heartbeatService(db: Db) {
const running = runningProcesses.get(run.id);
if (running) {
running.child.kill("SIGTERM");
await terminateHeartbeatRunProcess({
pid: running.child.pid ?? run.processPid,
processGroupId: running.processGroupId ?? run.processGroupId,
graceMs: Math.max(1, running.graceSec) * 1000,
});
runningProcesses.delete(run.id);
} else if (run.processPid || run.processGroupId) {
await terminateHeartbeatRunProcess({
pid: run.processPid,
processGroupId: run.processGroupId,
});
}
await releaseIssueExecutionAndPromote(run);
}
@ -4515,6 +4612,15 @@ export function heartbeatService(db: Db) {
cancelBudgetScopeWork,
getRunIssueSummary: async (runId: string) => {
const [run] = await db
.select(heartbeatRunIssueSummaryColumns)
.from(heartbeatRuns)
.where(eq(heartbeatRuns.id, runId))
.limit(1);
return run ?? null;
},
getActiveRunForAgent: async (agentId: string) => {
const [run] = await db
.select()
@ -4529,5 +4635,20 @@ export function heartbeatService(db: Db) {
.limit(1);
return run ?? null;
},
getActiveRunIssueSummaryForAgent: async (agentId: string) => {
const [run] = await db
.select(heartbeatRunIssueSummaryColumns)
.from(heartbeatRuns)
.where(
and(
eq(heartbeatRuns.agentId, agentId),
eq(heartbeatRuns.status, "running"),
),
)
.orderBy(desc(heartbeatRuns.startedAt))
.limit(1);
return run ?? null;
},
};
}

View file

@ -221,6 +221,17 @@ export function isPidAlive(pid: number) {
}
}
export function isProcessGroupAlive(processGroupId: number | null | undefined) {
if (process.platform === "win32") return false;
if (typeof processGroupId !== "number" || !Number.isInteger(processGroupId) || processGroupId <= 0) return false;
try {
process.kill(-processGroupId, 0);
return true;
} catch {
return false;
}
}
async function isLikelyMatchingCommand(record: LocalServiceRegistryRecord) {
if (process.platform === "win32") return true;
try {
@ -296,13 +307,19 @@ export async function terminateLocalService(
const deadline = Date.now() + (opts?.forceAfterMs ?? 2_000);
while (Date.now() < deadline) {
if (!isPidAlive(record.pid)) {
const targetAlive = targetProcessGroup
? isProcessGroupAlive(record.processGroupId)
: isPidAlive(record.pid);
if (!targetAlive) {
return;
}
await delay(100);
}
if (!isPidAlive(record.pid)) return;
const stillAlive = targetProcessGroup
? isProcessGroupAlive(record.processGroupId)
: isPidAlive(record.pid);
if (!stillAlive) return;
try {
if (targetProcessGroup) {
process.kill(-record.processGroupId!, "SIGKILL");