feat(costs): add billing, quota, and budget control plane

This commit is contained in:
Dotta 2026-03-14 22:00:12 -05:00
parent 656b4659fc
commit 76e6cc08a6
91 changed files with 22406 additions and 769 deletions

View file

@ -1,14 +1,19 @@
import { and, desc, eq, gte, isNotNull, lte, sql } from "drizzle-orm";
import type { Db } from "@paperclipai/db";
import { activityLog, agents, companies, costEvents, heartbeatRuns, issues, projects } from "@paperclipai/db";
import { activityLog, agents, companies, costEvents, issues, projects } from "@paperclipai/db";
import { notFound, unprocessable } from "../errors.js";
import { budgetService } from "./budgets.js";
export interface CostDateRange {
from?: Date;
to?: Date;
}
const METERED_BILLING_TYPE = "metered_api";
const SUBSCRIPTION_BILLING_TYPES = ["subscription_included", "subscription_overage"] as const;
export function costService(db: Db) {
const budgets = budgetService(db);
return {
createEvent: async (companyId: string, data: Omit<typeof costEvents.$inferInsert, "companyId">) => {
const agent = await db
@ -24,7 +29,13 @@ export function costService(db: Db) {
const event = await db
.insert(costEvents)
.values({ ...data, companyId })
.values({
...data,
companyId,
biller: data.biller ?? data.provider,
billingType: data.billingType ?? "unknown",
cachedInputTokens: data.cachedInputTokens ?? 0,
})
.returning()
.then((rows) => rows[0]);
@ -63,6 +74,8 @@ export function costService(db: Db) {
.where(eq(agents.id, updatedAgent.id));
}
await budgets.evaluateCostEvent(event);
return event;
},
@ -105,52 +118,31 @@ export function costService(db: Db) {
if (range?.from) conditions.push(gte(costEvents.occurredAt, range.from));
if (range?.to) conditions.push(lte(costEvents.occurredAt, range.to));
const costRows = await db
return db
.select({
agentId: costEvents.agentId,
agentName: agents.name,
agentStatus: agents.status,
costCents: sql<number>`coalesce(sum(${costEvents.costCents}), 0)::int`,
inputTokens: sql<number>`coalesce(sum(${costEvents.inputTokens}), 0)::int`,
cachedInputTokens: sql<number>`coalesce(sum(${costEvents.cachedInputTokens}), 0)::int`,
outputTokens: sql<number>`coalesce(sum(${costEvents.outputTokens}), 0)::int`,
apiRunCount:
sql<number>`count(distinct case when ${costEvents.billingType} = ${METERED_BILLING_TYPE} then ${costEvents.heartbeatRunId} end)::int`,
subscriptionRunCount:
sql<number>`count(distinct case when ${costEvents.billingType} in (${sql.join(SUBSCRIPTION_BILLING_TYPES.map((value) => sql`${value}`), sql`, `)}) then ${costEvents.heartbeatRunId} end)::int`,
subscriptionCachedInputTokens:
sql<number>`coalesce(sum(case when ${costEvents.billingType} in (${sql.join(SUBSCRIPTION_BILLING_TYPES.map((value) => sql`${value}`), sql`, `)}) then ${costEvents.cachedInputTokens} else 0 end), 0)::int`,
subscriptionInputTokens:
sql<number>`coalesce(sum(case when ${costEvents.billingType} in (${sql.join(SUBSCRIPTION_BILLING_TYPES.map((value) => sql`${value}`), sql`, `)}) then ${costEvents.inputTokens} else 0 end), 0)::int`,
subscriptionOutputTokens:
sql<number>`coalesce(sum(case when ${costEvents.billingType} in (${sql.join(SUBSCRIPTION_BILLING_TYPES.map((value) => sql`${value}`), sql`, `)}) then ${costEvents.outputTokens} else 0 end), 0)::int`,
})
.from(costEvents)
.leftJoin(agents, eq(costEvents.agentId, agents.id))
.where(and(...conditions))
.groupBy(costEvents.agentId, agents.name, agents.status)
.orderBy(desc(sql`coalesce(sum(${costEvents.costCents}), 0)::int`));
const runConditions: ReturnType<typeof eq>[] = [eq(heartbeatRuns.companyId, companyId)];
if (range?.from) runConditions.push(gte(heartbeatRuns.startedAt, range.from));
if (range?.to) runConditions.push(lte(heartbeatRuns.startedAt, range.to));
const runRows = await db
.select({
agentId: heartbeatRuns.agentId,
apiRunCount:
sql<number>`coalesce(sum(case when coalesce((${heartbeatRuns.usageJson} ->> 'billingType'), 'unknown') = 'api' then 1 else 0 end), 0)::int`,
subscriptionRunCount:
sql<number>`coalesce(sum(case when coalesce((${heartbeatRuns.usageJson} ->> 'billingType'), 'unknown') = 'subscription' then 1 else 0 end), 0)::int`,
subscriptionInputTokens:
sql<number>`coalesce(sum(case when coalesce((${heartbeatRuns.usageJson} ->> 'billingType'), 'unknown') = 'subscription' then coalesce((${heartbeatRuns.usageJson} ->> 'inputTokens')::int, 0) else 0 end), 0)::int`,
subscriptionOutputTokens:
sql<number>`coalesce(sum(case when coalesce((${heartbeatRuns.usageJson} ->> 'billingType'), 'unknown') = 'subscription' then coalesce((${heartbeatRuns.usageJson} ->> 'outputTokens')::int, 0) else 0 end), 0)::int`,
})
.from(heartbeatRuns)
.where(and(...runConditions))
.groupBy(heartbeatRuns.agentId);
const runRowsByAgent = new Map(runRows.map((row) => [row.agentId, row]));
return costRows.map((row) => {
const runRow = runRowsByAgent.get(row.agentId);
return {
...row,
apiRunCount: runRow?.apiRunCount ?? 0,
subscriptionRunCount: runRow?.subscriptionRunCount ?? 0,
subscriptionInputTokens: runRow?.subscriptionInputTokens ?? 0,
subscriptionOutputTokens: runRow?.subscriptionOutputTokens ?? 0,
};
});
},
byProvider: async (companyId: string, range?: CostDateRange) => {
@ -158,68 +150,62 @@ export function costService(db: Db) {
if (range?.from) conditions.push(gte(costEvents.occurredAt, range.from));
if (range?.to) conditions.push(lte(costEvents.occurredAt, range.to));
const costRows = await db
return db
.select({
provider: costEvents.provider,
biller: costEvents.biller,
billingType: costEvents.billingType,
model: costEvents.model,
costCents: sql<number>`coalesce(sum(${costEvents.costCents}), 0)::int`,
inputTokens: sql<number>`coalesce(sum(${costEvents.inputTokens}), 0)::int`,
cachedInputTokens: sql<number>`coalesce(sum(${costEvents.cachedInputTokens}), 0)::int`,
outputTokens: sql<number>`coalesce(sum(${costEvents.outputTokens}), 0)::int`,
apiRunCount:
sql<number>`count(distinct case when ${costEvents.billingType} = ${METERED_BILLING_TYPE} then ${costEvents.heartbeatRunId} end)::int`,
subscriptionRunCount:
sql<number>`count(distinct case when ${costEvents.billingType} in (${sql.join(SUBSCRIPTION_BILLING_TYPES.map((value) => sql`${value}`), sql`, `)}) then ${costEvents.heartbeatRunId} end)::int`,
subscriptionCachedInputTokens:
sql<number>`coalesce(sum(case when ${costEvents.billingType} in (${sql.join(SUBSCRIPTION_BILLING_TYPES.map((value) => sql`${value}`), sql`, `)}) then ${costEvents.cachedInputTokens} else 0 end), 0)::int`,
subscriptionInputTokens:
sql<number>`coalesce(sum(case when ${costEvents.billingType} in (${sql.join(SUBSCRIPTION_BILLING_TYPES.map((value) => sql`${value}`), sql`, `)}) then ${costEvents.inputTokens} else 0 end), 0)::int`,
subscriptionOutputTokens:
sql<number>`coalesce(sum(case when ${costEvents.billingType} in (${sql.join(SUBSCRIPTION_BILLING_TYPES.map((value) => sql`${value}`), sql`, `)}) then ${costEvents.outputTokens} else 0 end), 0)::int`,
})
.from(costEvents)
.where(and(...conditions))
.groupBy(costEvents.provider, costEvents.model)
.groupBy(costEvents.provider, costEvents.biller, costEvents.billingType, costEvents.model)
.orderBy(desc(sql`coalesce(sum(${costEvents.costCents}), 0)::int`));
},
const runConditions: ReturnType<typeof eq>[] = [eq(heartbeatRuns.companyId, companyId)];
if (range?.from) runConditions.push(gte(heartbeatRuns.startedAt, range.from));
if (range?.to) runConditions.push(lte(heartbeatRuns.startedAt, range.to));
byBiller: async (companyId: string, range?: CostDateRange) => {
const conditions: ReturnType<typeof eq>[] = [eq(costEvents.companyId, companyId)];
if (range?.from) conditions.push(gte(costEvents.occurredAt, range.from));
if (range?.to) conditions.push(lte(costEvents.occurredAt, range.to));
const runRows = await db
return db
.select({
agentId: heartbeatRuns.agentId,
biller: costEvents.biller,
costCents: sql<number>`coalesce(sum(${costEvents.costCents}), 0)::int`,
inputTokens: sql<number>`coalesce(sum(${costEvents.inputTokens}), 0)::int`,
cachedInputTokens: sql<number>`coalesce(sum(${costEvents.cachedInputTokens}), 0)::int`,
outputTokens: sql<number>`coalesce(sum(${costEvents.outputTokens}), 0)::int`,
apiRunCount:
sql<number>`coalesce(sum(case when coalesce((${heartbeatRuns.usageJson} ->> 'billingType'), 'unknown') = 'api' then 1 else 0 end), 0)::int`,
sql<number>`count(distinct case when ${costEvents.billingType} = ${METERED_BILLING_TYPE} then ${costEvents.heartbeatRunId} end)::int`,
subscriptionRunCount:
sql<number>`coalesce(sum(case when coalesce((${heartbeatRuns.usageJson} ->> 'billingType'), 'unknown') = 'subscription' then 1 else 0 end), 0)::int`,
sql<number>`count(distinct case when ${costEvents.billingType} in (${sql.join(SUBSCRIPTION_BILLING_TYPES.map((value) => sql`${value}`), sql`, `)}) then ${costEvents.heartbeatRunId} end)::int`,
subscriptionCachedInputTokens:
sql<number>`coalesce(sum(case when ${costEvents.billingType} in (${sql.join(SUBSCRIPTION_BILLING_TYPES.map((value) => sql`${value}`), sql`, `)}) then ${costEvents.cachedInputTokens} else 0 end), 0)::int`,
subscriptionInputTokens:
sql<number>`coalesce(sum(case when coalesce((${heartbeatRuns.usageJson} ->> 'billingType'), 'unknown') = 'subscription' then coalesce((${heartbeatRuns.usageJson} ->> 'inputTokens')::int, 0) else 0 end), 0)::int`,
sql<number>`coalesce(sum(case when ${costEvents.billingType} in (${sql.join(SUBSCRIPTION_BILLING_TYPES.map((value) => sql`${value}`), sql`, `)}) then ${costEvents.inputTokens} else 0 end), 0)::int`,
subscriptionOutputTokens:
sql<number>`coalesce(sum(case when coalesce((${heartbeatRuns.usageJson} ->> 'billingType'), 'unknown') = 'subscription' then coalesce((${heartbeatRuns.usageJson} ->> 'outputTokens')::int, 0) else 0 end), 0)::int`,
sql<number>`coalesce(sum(case when ${costEvents.billingType} in (${sql.join(SUBSCRIPTION_BILLING_TYPES.map((value) => sql`${value}`), sql`, `)}) then ${costEvents.outputTokens} else 0 end), 0)::int`,
providerCount: sql<number>`count(distinct ${costEvents.provider})::int`,
modelCount: sql<number>`count(distinct ${costEvents.model})::int`,
})
.from(heartbeatRuns)
.where(and(...runConditions))
.groupBy(heartbeatRuns.agentId);
// aggregate run billing splits across all agents (runs don't carry model info so we can't go per-model)
const totals = runRows.reduce(
(acc, r) => ({
apiRunCount: acc.apiRunCount + r.apiRunCount,
subscriptionRunCount: acc.subscriptionRunCount + r.subscriptionRunCount,
subscriptionInputTokens: acc.subscriptionInputTokens + r.subscriptionInputTokens,
subscriptionOutputTokens: acc.subscriptionOutputTokens + r.subscriptionOutputTokens,
}),
{ apiRunCount: 0, subscriptionRunCount: 0, subscriptionInputTokens: 0, subscriptionOutputTokens: 0 },
);
// pro-rate billing split across models by token share
const totalTokens = costRows.reduce((s, r) => s + r.inputTokens + r.outputTokens, 0);
return costRows.map((row) => {
const rowTokens = row.inputTokens + row.outputTokens;
const share = totalTokens > 0 ? rowTokens / totalTokens : 0;
return {
provider: row.provider,
model: row.model,
costCents: row.costCents,
inputTokens: row.inputTokens,
outputTokens: row.outputTokens,
apiRunCount: Math.round(totals.apiRunCount * share),
subscriptionRunCount: Math.round(totals.subscriptionRunCount * share),
subscriptionInputTokens: Math.round(totals.subscriptionInputTokens * share),
subscriptionOutputTokens: Math.round(totals.subscriptionOutputTokens * share),
};
});
.from(costEvents)
.where(and(...conditions))
.groupBy(costEvents.biller)
.orderBy(desc(sql`coalesce(sum(${costEvents.costCents}), 0)::int`));
},
/**
@ -240,8 +226,10 @@ export function costService(db: Db) {
const rows = await db
.select({
provider: costEvents.provider,
biller: sql<string>`case when count(distinct ${costEvents.biller}) = 1 then min(${costEvents.biller}) else 'mixed' end`,
costCents: sql<number>`coalesce(sum(${costEvents.costCents}), 0)::int`,
inputTokens: sql<number>`coalesce(sum(${costEvents.inputTokens}), 0)::int`,
cachedInputTokens: sql<number>`coalesce(sum(${costEvents.cachedInputTokens}), 0)::int`,
outputTokens: sql<number>`coalesce(sum(${costEvents.outputTokens}), 0)::int`,
})
.from(costEvents)
@ -256,10 +244,12 @@ export function costService(db: Db) {
return rows.map((row) => ({
provider: row.provider,
biller: row.biller,
window: label as string,
windowHours: hours,
costCents: row.costCents,
inputTokens: row.inputTokens,
cachedInputTokens: row.cachedInputTokens,
outputTokens: row.outputTokens,
}));
}),
@ -282,16 +272,26 @@ export function costService(db: Db) {
agentId: costEvents.agentId,
agentName: agents.name,
provider: costEvents.provider,
biller: costEvents.biller,
billingType: costEvents.billingType,
model: costEvents.model,
costCents: sql<number>`coalesce(sum(${costEvents.costCents}), 0)::int`,
inputTokens: sql<number>`coalesce(sum(${costEvents.inputTokens}), 0)::int`,
cachedInputTokens: sql<number>`coalesce(sum(${costEvents.cachedInputTokens}), 0)::int`,
outputTokens: sql<number>`coalesce(sum(${costEvents.outputTokens}), 0)::int`,
})
.from(costEvents)
.leftJoin(agents, eq(costEvents.agentId, agents.id))
.where(and(...conditions))
.groupBy(costEvents.agentId, agents.name, costEvents.provider, costEvents.model)
.orderBy(costEvents.provider, costEvents.model);
.groupBy(
costEvents.agentId,
agents.name,
costEvents.provider,
costEvents.biller,
costEvents.billingType,
costEvents.model,
)
.orderBy(costEvents.provider, costEvents.biller, costEvents.billingType, costEvents.model);
},
byProject: async (companyId: string, range?: CostDateRange) => {
@ -320,25 +320,27 @@ export function costService(db: Db) {
.orderBy(activityLog.runId, issues.projectId, desc(activityLog.createdAt))
.as("run_project_links");
const conditions: ReturnType<typeof eq>[] = [eq(heartbeatRuns.companyId, companyId)];
if (range?.from) conditions.push(gte(heartbeatRuns.startedAt, range.from));
if (range?.to) conditions.push(lte(heartbeatRuns.startedAt, range.to));
const effectiveProjectId = sql<string | null>`coalesce(${costEvents.projectId}, ${runProjectLinks.projectId})`;
const conditions: ReturnType<typeof eq>[] = [eq(costEvents.companyId, companyId)];
if (range?.from) conditions.push(gte(costEvents.occurredAt, range.from));
if (range?.to) conditions.push(lte(costEvents.occurredAt, range.to));
const costCentsExpr = sql<number>`coalesce(sum(round(coalesce((${heartbeatRuns.usageJson} ->> 'costUsd')::numeric, 0) * 100)), 0)::int`;
const costCentsExpr = sql<number>`coalesce(sum(${costEvents.costCents}), 0)::int`;
return db
.select({
projectId: runProjectLinks.projectId,
projectId: effectiveProjectId,
projectName: projects.name,
costCents: costCentsExpr,
inputTokens: sql<number>`coalesce(sum(coalesce((${heartbeatRuns.usageJson} ->> 'inputTokens')::int, 0)), 0)::int`,
outputTokens: sql<number>`coalesce(sum(coalesce((${heartbeatRuns.usageJson} ->> 'outputTokens')::int, 0)), 0)::int`,
inputTokens: sql<number>`coalesce(sum(${costEvents.inputTokens}), 0)::int`,
cachedInputTokens: sql<number>`coalesce(sum(${costEvents.cachedInputTokens}), 0)::int`,
outputTokens: sql<number>`coalesce(sum(${costEvents.outputTokens}), 0)::int`,
})
.from(runProjectLinks)
.innerJoin(heartbeatRuns, eq(runProjectLinks.runId, heartbeatRuns.id))
.innerJoin(projects, eq(runProjectLinks.projectId, projects.id))
.where(and(...conditions))
.groupBy(runProjectLinks.projectId, projects.name)
.from(costEvents)
.leftJoin(runProjectLinks, eq(costEvents.heartbeatRunId, runProjectLinks.runId))
.innerJoin(projects, sql`${projects.id} = ${effectiveProjectId}`)
.where(and(...conditions, sql`${effectiveProjectId} is not null`))
.groupBy(effectiveProjectId, projects.name)
.orderBy(desc(costCentsExpr));
},
};