Harden budget enforcement and migration startup

This commit is contained in:
Dotta 2026-03-16 08:12:50 -05:00
parent 411952573e
commit 5f2c2ee0e2
15 changed files with 9473 additions and 122 deletions

View file

@ -23,7 +23,7 @@ import type { AdapterExecutionResult, AdapterInvocationMeta, AdapterSessionCodec
import { createLocalAgentJwt } from "../agent-auth-jwt.js";
import { parseObject, asBoolean, asNumber, appendWithCap, MAX_EXCERPT_BYTES } from "../adapters/utils.js";
import { costService } from "./costs.js";
import { budgetService } from "./budgets.js";
import { budgetService, type BudgetEnforcementScope } from "./budgets.js";
import { secretService } from "./secrets.js";
import { resolveDefaultAgentWorkspaceDir } from "../home-paths.js";
import { summarizeHeartbeatRunResultJson } from "./heartbeat-run-summary.js";
@ -617,10 +617,13 @@ function resolveNextSessionState(input: {
export function heartbeatService(db: Db) {
const runLogStore = getRunLogStore();
const budgets = budgetService(db);
const secretsSvc = secretService(db);
const issuesSvc = issueService(db);
const activeRunExecutions = new Set<string>();
const budgetHooks = {
cancelWorkForScope: cancelBudgetScopeWork,
};
const budgets = budgetService(db, budgetHooks);
async function getAgent(agentId: string) {
return db
@ -1203,6 +1206,26 @@ export function heartbeatService(db: Db) {
async function claimQueuedRun(run: typeof heartbeatRuns.$inferSelect) {
if (run.status !== "queued") return run;
const agent = await getAgent(run.agentId);
if (!agent) {
await cancelRunInternal(run.id, "Cancelled because the agent no longer exists");
return null;
}
if (agent.status === "paused" || agent.status === "terminated" || agent.status === "pending_approval") {
await cancelRunInternal(run.id, "Cancelled because the agent is not invokable");
return null;
}
const context = parseObject(run.contextSnapshot);
const budgetBlock = await budgets.getInvocationBlock(run.companyId, run.agentId, {
issueId: readNonEmptyString(context.issueId),
projectId: readNonEmptyString(context.projectId),
});
if (budgetBlock) {
await cancelRunInternal(run.id, budgetBlock.reason);
return null;
}
const claimedAt = new Date();
const claimed = await db
.update(heartbeatRuns)
@ -1382,7 +1405,7 @@ export function heartbeatService(db: Db) {
.where(eq(agentRuntimeState.agentId, agent.id));
if (additionalCostCents > 0 || hasTokenUsage) {
const costs = costService(db);
const costs = costService(db, budgetHooks);
await costs.createEvent(agent.companyId, {
heartbeatRunId: run.id,
agentId: agent.id,
@ -1405,6 +1428,9 @@ export function heartbeatService(db: Db) {
return withAgentStartLock(agentId, async () => {
const agent = await getAgent(agentId);
if (!agent) return [];
if (agent.status === "paused" || agent.status === "terminated" || agent.status === "pending_approval") {
return [];
}
const policy = parseHeartbeatPolicy(agent);
const runningCount = await countRunningRunsForAgent(agentId);
const availableSlots = Math.max(0, policy.maxConcurrentRuns - runningCount);
@ -2758,6 +2784,205 @@ export function heartbeatService(db: Db) {
return newRun;
}
async function listProjectScopedRunIds(companyId: string, projectId: string) {
const runIssueId = sql<string | null>`${heartbeatRuns.contextSnapshot} ->> 'issueId'`;
const effectiveProjectId = sql<string | null>`coalesce(${heartbeatRuns.contextSnapshot} ->> 'projectId', ${issues.projectId}::text)`;
const rows = await db
.selectDistinctOn([heartbeatRuns.id], { id: heartbeatRuns.id })
.from(heartbeatRuns)
.leftJoin(
issues,
and(
eq(issues.companyId, companyId),
sql`${issues.id}::text = ${runIssueId}`,
),
)
.where(
and(
eq(heartbeatRuns.companyId, companyId),
inArray(heartbeatRuns.status, ["queued", "running"]),
sql`${effectiveProjectId} = ${projectId}`,
),
);
return rows.map((row) => row.id);
}
async function listProjectScopedWakeupIds(companyId: string, projectId: string) {
const wakeIssueId = sql<string | null>`${agentWakeupRequests.payload} ->> 'issueId'`;
const effectiveProjectId = sql<string | null>`coalesce(${agentWakeupRequests.payload} ->> 'projectId', ${issues.projectId}::text)`;
const rows = await db
.selectDistinctOn([agentWakeupRequests.id], { id: agentWakeupRequests.id })
.from(agentWakeupRequests)
.leftJoin(
issues,
and(
eq(issues.companyId, companyId),
sql`${issues.id}::text = ${wakeIssueId}`,
),
)
.where(
and(
eq(agentWakeupRequests.companyId, companyId),
inArray(agentWakeupRequests.status, ["queued", "deferred_issue_execution"]),
sql`${agentWakeupRequests.runId} is null`,
sql`${effectiveProjectId} = ${projectId}`,
),
);
return rows.map((row) => row.id);
}
async function cancelPendingWakeupsForBudgetScope(scope: BudgetEnforcementScope) {
const now = new Date();
let wakeupIds: string[] = [];
if (scope.scopeType === "company") {
wakeupIds = await db
.select({ id: agentWakeupRequests.id })
.from(agentWakeupRequests)
.where(
and(
eq(agentWakeupRequests.companyId, scope.companyId),
inArray(agentWakeupRequests.status, ["queued", "deferred_issue_execution"]),
sql`${agentWakeupRequests.runId} is null`,
),
)
.then((rows) => rows.map((row) => row.id));
} else if (scope.scopeType === "agent") {
wakeupIds = await db
.select({ id: agentWakeupRequests.id })
.from(agentWakeupRequests)
.where(
and(
eq(agentWakeupRequests.companyId, scope.companyId),
eq(agentWakeupRequests.agentId, scope.scopeId),
inArray(agentWakeupRequests.status, ["queued", "deferred_issue_execution"]),
sql`${agentWakeupRequests.runId} is null`,
),
)
.then((rows) => rows.map((row) => row.id));
} else {
wakeupIds = await listProjectScopedWakeupIds(scope.companyId, scope.scopeId);
}
if (wakeupIds.length === 0) return 0;
await db
.update(agentWakeupRequests)
.set({
status: "cancelled",
finishedAt: now,
error: "Cancelled due to budget pause",
updatedAt: now,
})
.where(inArray(agentWakeupRequests.id, wakeupIds));
return wakeupIds.length;
}
async function cancelRunInternal(runId: string, reason = "Cancelled by control plane") {
const run = await getRun(runId);
if (!run) throw notFound("Heartbeat run not found");
if (run.status !== "running" && run.status !== "queued") return run;
const running = runningProcesses.get(run.id);
if (running) {
running.child.kill("SIGTERM");
const graceMs = Math.max(1, running.graceSec) * 1000;
setTimeout(() => {
if (!running.child.killed) {
running.child.kill("SIGKILL");
}
}, graceMs);
}
const cancelled = await setRunStatus(run.id, "cancelled", {
finishedAt: new Date(),
error: reason,
errorCode: "cancelled",
});
await setWakeupStatus(run.wakeupRequestId, "cancelled", {
finishedAt: new Date(),
error: reason,
});
if (cancelled) {
await appendRunEvent(cancelled, 1, {
eventType: "lifecycle",
stream: "system",
level: "warn",
message: "run cancelled",
});
await releaseIssueExecutionAndPromote(cancelled);
}
runningProcesses.delete(run.id);
await finalizeAgentStatus(run.agentId, "cancelled");
await startNextQueuedRunForAgent(run.agentId);
return cancelled;
}
async function cancelActiveForAgentInternal(agentId: string, reason = "Cancelled due to agent pause") {
const runs = await db
.select()
.from(heartbeatRuns)
.where(and(eq(heartbeatRuns.agentId, agentId), inArray(heartbeatRuns.status, ["queued", "running"])));
for (const run of runs) {
await setRunStatus(run.id, "cancelled", {
finishedAt: new Date(),
error: reason,
errorCode: "cancelled",
});
await setWakeupStatus(run.wakeupRequestId, "cancelled", {
finishedAt: new Date(),
error: reason,
});
const running = runningProcesses.get(run.id);
if (running) {
running.child.kill("SIGTERM");
runningProcesses.delete(run.id);
}
await releaseIssueExecutionAndPromote(run);
}
return runs.length;
}
async function cancelBudgetScopeWork(scope: BudgetEnforcementScope) {
if (scope.scopeType === "agent") {
await cancelActiveForAgentInternal(scope.scopeId, "Cancelled due to budget pause");
await cancelPendingWakeupsForBudgetScope(scope);
return;
}
const runIds =
scope.scopeType === "company"
? await db
.select({ id: heartbeatRuns.id })
.from(heartbeatRuns)
.where(
and(
eq(heartbeatRuns.companyId, scope.companyId),
inArray(heartbeatRuns.status, ["queued", "running"]),
),
)
.then((rows) => rows.map((row) => row.id))
: await listProjectScopedRunIds(scope.companyId, scope.scopeId);
for (const runId of runIds) {
await cancelRunInternal(runId, "Cancelled due to budget pause");
}
await cancelPendingWakeupsForBudgetScope(scope);
}
return {
list: async (companyId: string, agentId?: string, limit?: number) => {
const query = db
@ -2930,77 +3155,11 @@ export function heartbeatService(db: Db) {
return { checked, enqueued, skipped };
},
cancelRun: async (runId: string) => {
const run = await getRun(runId);
if (!run) throw notFound("Heartbeat run not found");
if (run.status !== "running" && run.status !== "queued") return run;
cancelRun: (runId: string) => cancelRunInternal(runId),
const running = runningProcesses.get(run.id);
if (running) {
running.child.kill("SIGTERM");
const graceMs = Math.max(1, running.graceSec) * 1000;
setTimeout(() => {
if (!running.child.killed) {
running.child.kill("SIGKILL");
}
}, graceMs);
}
cancelActiveForAgent: (agentId: string) => cancelActiveForAgentInternal(agentId),
const cancelled = await setRunStatus(run.id, "cancelled", {
finishedAt: new Date(),
error: "Cancelled by control plane",
errorCode: "cancelled",
});
await setWakeupStatus(run.wakeupRequestId, "cancelled", {
finishedAt: new Date(),
error: "Cancelled by control plane",
});
if (cancelled) {
await appendRunEvent(cancelled, 1, {
eventType: "lifecycle",
stream: "system",
level: "warn",
message: "run cancelled",
});
await releaseIssueExecutionAndPromote(cancelled);
}
runningProcesses.delete(run.id);
await finalizeAgentStatus(run.agentId, "cancelled");
await startNextQueuedRunForAgent(run.agentId);
return cancelled;
},
cancelActiveForAgent: async (agentId: string) => {
const runs = await db
.select()
.from(heartbeatRuns)
.where(and(eq(heartbeatRuns.agentId, agentId), inArray(heartbeatRuns.status, ["queued", "running"])));
for (const run of runs) {
await setRunStatus(run.id, "cancelled", {
finishedAt: new Date(),
error: "Cancelled due to agent pause",
errorCode: "cancelled",
});
await setWakeupStatus(run.wakeupRequestId, "cancelled", {
finishedAt: new Date(),
error: "Cancelled due to agent pause",
});
const running = runningProcesses.get(run.id);
if (running) {
running.child.kill("SIGTERM");
runningProcesses.delete(run.id);
}
await releaseIssueExecutionAndPromote(run);
}
return runs.length;
},
cancelBudgetScopeWork,
getActiveRunForAgent: async (agentId: string) => {
const [run] = await db