mirror of
https://github.com/alkimake/paperclip.git
synced 2026-06-18 03:30:39 +09:00
Merge public-gh/master into paperclip-company-import-export
This commit is contained in:
commit
cca086b863
125 changed files with 38085 additions and 683 deletions
|
|
@ -2,6 +2,7 @@ import fs from "node:fs/promises";
|
|||
import path from "node:path";
|
||||
import { and, asc, desc, eq, gt, inArray, sql } from "drizzle-orm";
|
||||
import type { Db } from "@paperclipai/db";
|
||||
import type { BillingType } from "@paperclipai/shared";
|
||||
import {
|
||||
agents,
|
||||
agentRuntimeState,
|
||||
|
|
@ -23,6 +24,7 @@ import { createLocalAgentJwt } from "../agent-auth-jwt.js";
|
|||
import { parseObject, asBoolean, asNumber, appendWithCap, MAX_EXCERPT_BYTES } from "../adapters/utils.js";
|
||||
import { costService } from "./costs.js";
|
||||
import { companySkillService } from "./company-skills.js";
|
||||
import { budgetService, type BudgetEnforcementScope } from "./budgets.js";
|
||||
import { secretService } from "./secrets.js";
|
||||
import { resolveDefaultAgentWorkspaceDir } from "../home-paths.js";
|
||||
import { summarizeHeartbeatRunResultJson } from "./heartbeat-run-summary.js";
|
||||
|
|
@ -171,6 +173,67 @@ function readNonEmptyString(value: unknown): string | null {
|
|||
return typeof value === "string" && value.trim().length > 0 ? value : null;
|
||||
}
|
||||
|
||||
function normalizeLedgerBillingType(value: unknown): BillingType {
|
||||
const raw = readNonEmptyString(value);
|
||||
switch (raw) {
|
||||
case "api":
|
||||
case "metered_api":
|
||||
return "metered_api";
|
||||
case "subscription":
|
||||
case "subscription_included":
|
||||
return "subscription_included";
|
||||
case "subscription_overage":
|
||||
return "subscription_overage";
|
||||
case "credits":
|
||||
return "credits";
|
||||
case "fixed":
|
||||
return "fixed";
|
||||
default:
|
||||
return "unknown";
|
||||
}
|
||||
}
|
||||
|
||||
function resolveLedgerBiller(result: AdapterExecutionResult): string {
|
||||
return readNonEmptyString(result.biller) ?? readNonEmptyString(result.provider) ?? "unknown";
|
||||
}
|
||||
|
||||
function normalizeBilledCostCents(costUsd: number | null | undefined, billingType: BillingType): number {
|
||||
if (billingType === "subscription_included") return 0;
|
||||
if (typeof costUsd !== "number" || !Number.isFinite(costUsd)) return 0;
|
||||
return Math.max(0, Math.round(costUsd * 100));
|
||||
}
|
||||
|
||||
async function resolveLedgerScopeForRun(
|
||||
db: Db,
|
||||
companyId: string,
|
||||
run: typeof heartbeatRuns.$inferSelect,
|
||||
) {
|
||||
const context = parseObject(run.contextSnapshot);
|
||||
const contextIssueId = readNonEmptyString(context.issueId);
|
||||
const contextProjectId = readNonEmptyString(context.projectId);
|
||||
|
||||
if (!contextIssueId) {
|
||||
return {
|
||||
issueId: null,
|
||||
projectId: contextProjectId,
|
||||
};
|
||||
}
|
||||
|
||||
const issue = await db
|
||||
.select({
|
||||
id: issues.id,
|
||||
projectId: issues.projectId,
|
||||
})
|
||||
.from(issues)
|
||||
.where(and(eq(issues.id, contextIssueId), eq(issues.companyId, companyId)))
|
||||
.then((rows) => rows[0] ?? null);
|
||||
|
||||
return {
|
||||
issueId: issue?.id ?? null,
|
||||
projectId: issue?.projectId ?? contextProjectId,
|
||||
};
|
||||
}
|
||||
|
||||
function normalizeUsageTotals(usage: UsageSummary | null | undefined): UsageTotals | null {
|
||||
if (!usage) return null;
|
||||
return {
|
||||
|
|
@ -559,6 +622,10 @@ export function heartbeatService(db: Db) {
|
|||
const companySkills = companySkillService(db);
|
||||
const issuesSvc = issueService(db);
|
||||
const activeRunExecutions = new Set<string>();
|
||||
const budgetHooks = {
|
||||
cancelWorkForScope: cancelBudgetScopeWork,
|
||||
};
|
||||
const budgets = budgetService(db, budgetHooks);
|
||||
|
||||
async function getAgent(agentId: string) {
|
||||
return db
|
||||
|
|
@ -1141,6 +1208,26 @@ export function heartbeatService(db: Db) {
|
|||
|
||||
async function claimQueuedRun(run: typeof heartbeatRuns.$inferSelect) {
|
||||
if (run.status !== "queued") return run;
|
||||
const agent = await getAgent(run.agentId);
|
||||
if (!agent) {
|
||||
await cancelRunInternal(run.id, "Cancelled because the agent no longer exists");
|
||||
return null;
|
||||
}
|
||||
if (agent.status === "paused" || agent.status === "terminated" || agent.status === "pending_approval") {
|
||||
await cancelRunInternal(run.id, "Cancelled because the agent is not invokable");
|
||||
return null;
|
||||
}
|
||||
|
||||
const context = parseObject(run.contextSnapshot);
|
||||
const budgetBlock = await budgets.getInvocationBlock(run.companyId, run.agentId, {
|
||||
issueId: readNonEmptyString(context.issueId),
|
||||
projectId: readNonEmptyString(context.projectId),
|
||||
});
|
||||
if (budgetBlock) {
|
||||
await cancelRunInternal(run.id, budgetBlock.reason);
|
||||
return null;
|
||||
}
|
||||
|
||||
const claimedAt = new Date();
|
||||
const claimed = await db
|
||||
.update(heartbeatRuns)
|
||||
|
|
@ -1296,8 +1383,12 @@ export function heartbeatService(db: Db) {
|
|||
const inputTokens = usage?.inputTokens ?? 0;
|
||||
const outputTokens = usage?.outputTokens ?? 0;
|
||||
const cachedInputTokens = usage?.cachedInputTokens ?? 0;
|
||||
const additionalCostCents = Math.max(0, Math.round((result.costUsd ?? 0) * 100));
|
||||
const billingType = normalizeLedgerBillingType(result.billingType);
|
||||
const additionalCostCents = normalizeBilledCostCents(result.costUsd, billingType);
|
||||
const hasTokenUsage = inputTokens > 0 || outputTokens > 0 || cachedInputTokens > 0;
|
||||
const provider = result.provider ?? "unknown";
|
||||
const biller = resolveLedgerBiller(result);
|
||||
const ledgerScope = await resolveLedgerScopeForRun(db, agent.companyId, run);
|
||||
|
||||
await db
|
||||
.update(agentRuntimeState)
|
||||
|
|
@ -1316,12 +1407,18 @@ export function heartbeatService(db: Db) {
|
|||
.where(eq(agentRuntimeState.agentId, agent.id));
|
||||
|
||||
if (additionalCostCents > 0 || hasTokenUsage) {
|
||||
const costs = costService(db);
|
||||
const costs = costService(db, budgetHooks);
|
||||
await costs.createEvent(agent.companyId, {
|
||||
heartbeatRunId: run.id,
|
||||
agentId: agent.id,
|
||||
provider: result.provider ?? "unknown",
|
||||
issueId: ledgerScope.issueId,
|
||||
projectId: ledgerScope.projectId,
|
||||
provider,
|
||||
biller,
|
||||
billingType,
|
||||
model: result.model ?? "unknown",
|
||||
inputTokens,
|
||||
cachedInputTokens,
|
||||
outputTokens,
|
||||
costCents: additionalCostCents,
|
||||
occurredAt: new Date(),
|
||||
|
|
@ -1333,6 +1430,9 @@ export function heartbeatService(db: Db) {
|
|||
return withAgentStartLock(agentId, async () => {
|
||||
const agent = await getAgent(agentId);
|
||||
if (!agent) return [];
|
||||
if (agent.status === "paused" || agent.status === "terminated" || agent.status === "pending_approval") {
|
||||
return [];
|
||||
}
|
||||
const policy = parseHeartbeatPolicy(agent);
|
||||
const runningCount = await countRunningRunsForAgent(agentId);
|
||||
const availableSlots = Math.max(0, policy.maxConcurrentRuns - runningCount);
|
||||
|
|
@ -1882,8 +1982,11 @@ export function heartbeatService(db: Db) {
|
|||
freshSession: runtimeForAdapter.sessionId == null && runtimeForAdapter.sessionDisplayId == null,
|
||||
sessionRotated: sessionCompaction.rotate,
|
||||
sessionRotationReason: sessionCompaction.reason,
|
||||
provider: readNonEmptyString(adapterResult.provider) ?? "unknown",
|
||||
biller: resolveLedgerBiller(adapterResult),
|
||||
model: readNonEmptyString(adapterResult.model) ?? "unknown",
|
||||
...(adapterResult.costUsd != null ? { costUsd: adapterResult.costUsd } : {}),
|
||||
...(adapterResult.billingType ? { billingType: adapterResult.billingType } : {}),
|
||||
billingType: normalizeLedgerBillingType(adapterResult.billingType),
|
||||
} as Record<string, unknown>)
|
||||
: null;
|
||||
|
||||
|
|
@ -2233,6 +2336,43 @@ export function heartbeatService(db: Db) {
|
|||
const agent = await getAgent(agentId);
|
||||
if (!agent) throw notFound("Agent not found");
|
||||
|
||||
const writeSkippedRequest = async (skipReason: string) => {
|
||||
await db.insert(agentWakeupRequests).values({
|
||||
companyId: agent.companyId,
|
||||
agentId,
|
||||
source,
|
||||
triggerDetail,
|
||||
reason: skipReason,
|
||||
payload,
|
||||
status: "skipped",
|
||||
requestedByActorType: opts.requestedByActorType ?? null,
|
||||
requestedByActorId: opts.requestedByActorId ?? null,
|
||||
idempotencyKey: opts.idempotencyKey ?? null,
|
||||
finishedAt: new Date(),
|
||||
});
|
||||
};
|
||||
|
||||
let projectId = readNonEmptyString(enrichedContextSnapshot.projectId);
|
||||
if (!projectId && issueId) {
|
||||
projectId = await db
|
||||
.select({ projectId: issues.projectId })
|
||||
.from(issues)
|
||||
.where(and(eq(issues.id, issueId), eq(issues.companyId, agent.companyId)))
|
||||
.then((rows) => rows[0]?.projectId ?? null);
|
||||
}
|
||||
|
||||
const budgetBlock = await budgets.getInvocationBlock(agent.companyId, agentId, {
|
||||
issueId,
|
||||
projectId,
|
||||
});
|
||||
if (budgetBlock) {
|
||||
await writeSkippedRequest("budget.blocked");
|
||||
throw conflict(budgetBlock.reason, {
|
||||
scopeType: budgetBlock.scopeType,
|
||||
scopeId: budgetBlock.scopeId,
|
||||
});
|
||||
}
|
||||
|
||||
if (
|
||||
agent.status === "paused" ||
|
||||
agent.status === "terminated" ||
|
||||
|
|
@ -2242,21 +2382,6 @@ export function heartbeatService(db: Db) {
|
|||
}
|
||||
|
||||
const policy = parseHeartbeatPolicy(agent);
|
||||
const writeSkippedRequest = async (reason: string) => {
|
||||
await db.insert(agentWakeupRequests).values({
|
||||
companyId: agent.companyId,
|
||||
agentId,
|
||||
source,
|
||||
triggerDetail,
|
||||
reason,
|
||||
payload,
|
||||
status: "skipped",
|
||||
requestedByActorType: opts.requestedByActorType ?? null,
|
||||
requestedByActorId: opts.requestedByActorId ?? null,
|
||||
idempotencyKey: opts.idempotencyKey ?? null,
|
||||
finishedAt: new Date(),
|
||||
});
|
||||
};
|
||||
|
||||
if (source === "timer" && !policy.enabled) {
|
||||
await writeSkippedRequest("heartbeat.disabled");
|
||||
|
|
@ -2666,6 +2791,205 @@ export function heartbeatService(db: Db) {
|
|||
return newRun;
|
||||
}
|
||||
|
||||
async function listProjectScopedRunIds(companyId: string, projectId: string) {
|
||||
const runIssueId = sql<string | null>`${heartbeatRuns.contextSnapshot} ->> 'issueId'`;
|
||||
const effectiveProjectId = sql<string | null>`coalesce(${heartbeatRuns.contextSnapshot} ->> 'projectId', ${issues.projectId}::text)`;
|
||||
|
||||
const rows = await db
|
||||
.selectDistinctOn([heartbeatRuns.id], { id: heartbeatRuns.id })
|
||||
.from(heartbeatRuns)
|
||||
.leftJoin(
|
||||
issues,
|
||||
and(
|
||||
eq(issues.companyId, companyId),
|
||||
sql`${issues.id}::text = ${runIssueId}`,
|
||||
),
|
||||
)
|
||||
.where(
|
||||
and(
|
||||
eq(heartbeatRuns.companyId, companyId),
|
||||
inArray(heartbeatRuns.status, ["queued", "running"]),
|
||||
sql`${effectiveProjectId} = ${projectId}`,
|
||||
),
|
||||
);
|
||||
|
||||
return rows.map((row) => row.id);
|
||||
}
|
||||
|
||||
async function listProjectScopedWakeupIds(companyId: string, projectId: string) {
|
||||
const wakeIssueId = sql<string | null>`${agentWakeupRequests.payload} ->> 'issueId'`;
|
||||
const effectiveProjectId = sql<string | null>`coalesce(${agentWakeupRequests.payload} ->> 'projectId', ${issues.projectId}::text)`;
|
||||
|
||||
const rows = await db
|
||||
.selectDistinctOn([agentWakeupRequests.id], { id: agentWakeupRequests.id })
|
||||
.from(agentWakeupRequests)
|
||||
.leftJoin(
|
||||
issues,
|
||||
and(
|
||||
eq(issues.companyId, companyId),
|
||||
sql`${issues.id}::text = ${wakeIssueId}`,
|
||||
),
|
||||
)
|
||||
.where(
|
||||
and(
|
||||
eq(agentWakeupRequests.companyId, companyId),
|
||||
inArray(agentWakeupRequests.status, ["queued", "deferred_issue_execution"]),
|
||||
sql`${agentWakeupRequests.runId} is null`,
|
||||
sql`${effectiveProjectId} = ${projectId}`,
|
||||
),
|
||||
);
|
||||
|
||||
return rows.map((row) => row.id);
|
||||
}
|
||||
|
||||
async function cancelPendingWakeupsForBudgetScope(scope: BudgetEnforcementScope) {
|
||||
const now = new Date();
|
||||
let wakeupIds: string[] = [];
|
||||
|
||||
if (scope.scopeType === "company") {
|
||||
wakeupIds = await db
|
||||
.select({ id: agentWakeupRequests.id })
|
||||
.from(agentWakeupRequests)
|
||||
.where(
|
||||
and(
|
||||
eq(agentWakeupRequests.companyId, scope.companyId),
|
||||
inArray(agentWakeupRequests.status, ["queued", "deferred_issue_execution"]),
|
||||
sql`${agentWakeupRequests.runId} is null`,
|
||||
),
|
||||
)
|
||||
.then((rows) => rows.map((row) => row.id));
|
||||
} else if (scope.scopeType === "agent") {
|
||||
wakeupIds = await db
|
||||
.select({ id: agentWakeupRequests.id })
|
||||
.from(agentWakeupRequests)
|
||||
.where(
|
||||
and(
|
||||
eq(agentWakeupRequests.companyId, scope.companyId),
|
||||
eq(agentWakeupRequests.agentId, scope.scopeId),
|
||||
inArray(agentWakeupRequests.status, ["queued", "deferred_issue_execution"]),
|
||||
sql`${agentWakeupRequests.runId} is null`,
|
||||
),
|
||||
)
|
||||
.then((rows) => rows.map((row) => row.id));
|
||||
} else {
|
||||
wakeupIds = await listProjectScopedWakeupIds(scope.companyId, scope.scopeId);
|
||||
}
|
||||
|
||||
if (wakeupIds.length === 0) return 0;
|
||||
|
||||
await db
|
||||
.update(agentWakeupRequests)
|
||||
.set({
|
||||
status: "cancelled",
|
||||
finishedAt: now,
|
||||
error: "Cancelled due to budget pause",
|
||||
updatedAt: now,
|
||||
})
|
||||
.where(inArray(agentWakeupRequests.id, wakeupIds));
|
||||
|
||||
return wakeupIds.length;
|
||||
}
|
||||
|
||||
async function cancelRunInternal(runId: string, reason = "Cancelled by control plane") {
|
||||
const run = await getRun(runId);
|
||||
if (!run) throw notFound("Heartbeat run not found");
|
||||
if (run.status !== "running" && run.status !== "queued") return run;
|
||||
|
||||
const running = runningProcesses.get(run.id);
|
||||
if (running) {
|
||||
running.child.kill("SIGTERM");
|
||||
const graceMs = Math.max(1, running.graceSec) * 1000;
|
||||
setTimeout(() => {
|
||||
if (!running.child.killed) {
|
||||
running.child.kill("SIGKILL");
|
||||
}
|
||||
}, graceMs);
|
||||
}
|
||||
|
||||
const cancelled = await setRunStatus(run.id, "cancelled", {
|
||||
finishedAt: new Date(),
|
||||
error: reason,
|
||||
errorCode: "cancelled",
|
||||
});
|
||||
|
||||
await setWakeupStatus(run.wakeupRequestId, "cancelled", {
|
||||
finishedAt: new Date(),
|
||||
error: reason,
|
||||
});
|
||||
|
||||
if (cancelled) {
|
||||
await appendRunEvent(cancelled, 1, {
|
||||
eventType: "lifecycle",
|
||||
stream: "system",
|
||||
level: "warn",
|
||||
message: "run cancelled",
|
||||
});
|
||||
await releaseIssueExecutionAndPromote(cancelled);
|
||||
}
|
||||
|
||||
runningProcesses.delete(run.id);
|
||||
await finalizeAgentStatus(run.agentId, "cancelled");
|
||||
await startNextQueuedRunForAgent(run.agentId);
|
||||
return cancelled;
|
||||
}
|
||||
|
||||
async function cancelActiveForAgentInternal(agentId: string, reason = "Cancelled due to agent pause") {
|
||||
const runs = await db
|
||||
.select()
|
||||
.from(heartbeatRuns)
|
||||
.where(and(eq(heartbeatRuns.agentId, agentId), inArray(heartbeatRuns.status, ["queued", "running"])));
|
||||
|
||||
for (const run of runs) {
|
||||
await setRunStatus(run.id, "cancelled", {
|
||||
finishedAt: new Date(),
|
||||
error: reason,
|
||||
errorCode: "cancelled",
|
||||
});
|
||||
|
||||
await setWakeupStatus(run.wakeupRequestId, "cancelled", {
|
||||
finishedAt: new Date(),
|
||||
error: reason,
|
||||
});
|
||||
|
||||
const running = runningProcesses.get(run.id);
|
||||
if (running) {
|
||||
running.child.kill("SIGTERM");
|
||||
runningProcesses.delete(run.id);
|
||||
}
|
||||
await releaseIssueExecutionAndPromote(run);
|
||||
}
|
||||
|
||||
return runs.length;
|
||||
}
|
||||
|
||||
async function cancelBudgetScopeWork(scope: BudgetEnforcementScope) {
|
||||
if (scope.scopeType === "agent") {
|
||||
await cancelActiveForAgentInternal(scope.scopeId, "Cancelled due to budget pause");
|
||||
await cancelPendingWakeupsForBudgetScope(scope);
|
||||
return;
|
||||
}
|
||||
|
||||
const runIds =
|
||||
scope.scopeType === "company"
|
||||
? await db
|
||||
.select({ id: heartbeatRuns.id })
|
||||
.from(heartbeatRuns)
|
||||
.where(
|
||||
and(
|
||||
eq(heartbeatRuns.companyId, scope.companyId),
|
||||
inArray(heartbeatRuns.status, ["queued", "running"]),
|
||||
),
|
||||
)
|
||||
.then((rows) => rows.map((row) => row.id))
|
||||
: await listProjectScopedRunIds(scope.companyId, scope.scopeId);
|
||||
|
||||
for (const runId of runIds) {
|
||||
await cancelRunInternal(runId, "Cancelled due to budget pause");
|
||||
}
|
||||
|
||||
await cancelPendingWakeupsForBudgetScope(scope);
|
||||
}
|
||||
|
||||
return {
|
||||
list: async (companyId: string, agentId?: string, limit?: number) => {
|
||||
const query = db
|
||||
|
|
@ -2838,77 +3162,11 @@ export function heartbeatService(db: Db) {
|
|||
return { checked, enqueued, skipped };
|
||||
},
|
||||
|
||||
cancelRun: async (runId: string) => {
|
||||
const run = await getRun(runId);
|
||||
if (!run) throw notFound("Heartbeat run not found");
|
||||
if (run.status !== "running" && run.status !== "queued") return run;
|
||||
cancelRun: (runId: string) => cancelRunInternal(runId),
|
||||
|
||||
const running = runningProcesses.get(run.id);
|
||||
if (running) {
|
||||
running.child.kill("SIGTERM");
|
||||
const graceMs = Math.max(1, running.graceSec) * 1000;
|
||||
setTimeout(() => {
|
||||
if (!running.child.killed) {
|
||||
running.child.kill("SIGKILL");
|
||||
}
|
||||
}, graceMs);
|
||||
}
|
||||
cancelActiveForAgent: (agentId: string) => cancelActiveForAgentInternal(agentId),
|
||||
|
||||
const cancelled = await setRunStatus(run.id, "cancelled", {
|
||||
finishedAt: new Date(),
|
||||
error: "Cancelled by control plane",
|
||||
errorCode: "cancelled",
|
||||
});
|
||||
|
||||
await setWakeupStatus(run.wakeupRequestId, "cancelled", {
|
||||
finishedAt: new Date(),
|
||||
error: "Cancelled by control plane",
|
||||
});
|
||||
|
||||
if (cancelled) {
|
||||
await appendRunEvent(cancelled, 1, {
|
||||
eventType: "lifecycle",
|
||||
stream: "system",
|
||||
level: "warn",
|
||||
message: "run cancelled",
|
||||
});
|
||||
await releaseIssueExecutionAndPromote(cancelled);
|
||||
}
|
||||
|
||||
runningProcesses.delete(run.id);
|
||||
await finalizeAgentStatus(run.agentId, "cancelled");
|
||||
await startNextQueuedRunForAgent(run.agentId);
|
||||
return cancelled;
|
||||
},
|
||||
|
||||
cancelActiveForAgent: async (agentId: string) => {
|
||||
const runs = await db
|
||||
.select()
|
||||
.from(heartbeatRuns)
|
||||
.where(and(eq(heartbeatRuns.agentId, agentId), inArray(heartbeatRuns.status, ["queued", "running"])));
|
||||
|
||||
for (const run of runs) {
|
||||
await setRunStatus(run.id, "cancelled", {
|
||||
finishedAt: new Date(),
|
||||
error: "Cancelled due to agent pause",
|
||||
errorCode: "cancelled",
|
||||
});
|
||||
|
||||
await setWakeupStatus(run.wakeupRequestId, "cancelled", {
|
||||
finishedAt: new Date(),
|
||||
error: "Cancelled due to agent pause",
|
||||
});
|
||||
|
||||
const running = runningProcesses.get(run.id);
|
||||
if (running) {
|
||||
running.child.kill("SIGTERM");
|
||||
runningProcesses.delete(run.id);
|
||||
}
|
||||
await releaseIssueExecutionAndPromote(run);
|
||||
}
|
||||
|
||||
return runs.length;
|
||||
},
|
||||
cancelBudgetScopeWork,
|
||||
|
||||
getActiveRunForAgent: async (agentId: string) => {
|
||||
const [run] = await db
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue