mirror of
https://github.com/alkimake/paperclip.git
synced 2026-06-16 10:50:38 +09:00
Add AES-256-GCM local encrypted secrets provider with auto-generated master key, stub providers for AWS/GCP/Vault, and a secrets service that normalizes adapter configs (converting sensitive inline values to secret refs in strict mode) and resolves secret refs back to plain values at runtime. Extract redaction utilities from agent routes into shared module. Redact sensitive values in activity logs, config revisions, and approval payloads. Block rollback of revisions containing redacted secrets. Filter hidden issues from list queries. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1335 lines
42 KiB
TypeScript
1335 lines
42 KiB
TypeScript
import { and, asc, desc, eq, gt, inArray, sql } from "drizzle-orm";
|
|
import type { Db } from "@paperclip/db";
|
|
import {
|
|
agents,
|
|
agentRuntimeState,
|
|
agentTaskSessions,
|
|
agentWakeupRequests,
|
|
heartbeatRunEvents,
|
|
heartbeatRuns,
|
|
costEvents,
|
|
} from "@paperclip/db";
|
|
import { conflict, notFound } from "../errors.js";
|
|
import { logger } from "../middleware/logger.js";
|
|
import { publishLiveEvent } from "./live-events.js";
|
|
import { getRunLogStore, type RunLogHandle } from "./run-log-store.js";
|
|
import { getServerAdapter, runningProcesses } from "../adapters/index.js";
|
|
import type { AdapterExecutionResult, AdapterInvocationMeta, AdapterSessionCodec } from "../adapters/index.js";
|
|
import { createLocalAgentJwt } from "../agent-auth-jwt.js";
|
|
import { parseObject, asBoolean, asNumber, appendWithCap, MAX_EXCERPT_BYTES } from "../adapters/utils.js";
|
|
import { secretService } from "./secrets.js";
|
|
|
|
const MAX_LIVE_LOG_CHUNK_BYTES = 8 * 1024;
|
|
|
|
function appendExcerpt(prev: string, chunk: string) {
|
|
return appendWithCap(prev, chunk, MAX_EXCERPT_BYTES);
|
|
}
|
|
|
|
interface WakeupOptions {
|
|
source?: "timer" | "assignment" | "on_demand" | "automation";
|
|
triggerDetail?: "manual" | "ping" | "callback" | "system";
|
|
reason?: string | null;
|
|
payload?: Record<string, unknown> | null;
|
|
idempotencyKey?: string | null;
|
|
requestedByActorType?: "user" | "agent" | "system";
|
|
requestedByActorId?: string | null;
|
|
contextSnapshot?: Record<string, unknown>;
|
|
}
|
|
|
|
function readNonEmptyString(value: unknown): string | null {
|
|
return typeof value === "string" && value.trim().length > 0 ? value : null;
|
|
}
|
|
|
|
function deriveTaskKey(
|
|
contextSnapshot: Record<string, unknown> | null | undefined,
|
|
payload: Record<string, unknown> | null | undefined,
|
|
) {
|
|
return (
|
|
readNonEmptyString(contextSnapshot?.taskKey) ??
|
|
readNonEmptyString(contextSnapshot?.taskId) ??
|
|
readNonEmptyString(contextSnapshot?.issueId) ??
|
|
readNonEmptyString(payload?.taskKey) ??
|
|
readNonEmptyString(payload?.taskId) ??
|
|
readNonEmptyString(payload?.issueId) ??
|
|
null
|
|
);
|
|
}
|
|
|
|
function runTaskKey(run: typeof heartbeatRuns.$inferSelect) {
|
|
return deriveTaskKey(run.contextSnapshot as Record<string, unknown> | null, null);
|
|
}
|
|
|
|
function isSameTaskScope(left: string | null, right: string | null) {
|
|
return (left ?? null) === (right ?? null);
|
|
}
|
|
|
|
function truncateDisplayId(value: string | null | undefined, max = 128) {
|
|
if (!value) return null;
|
|
return value.length > max ? value.slice(0, max) : value;
|
|
}
|
|
|
|
const defaultSessionCodec: AdapterSessionCodec = {
|
|
deserialize(raw: unknown) {
|
|
const asObj = parseObject(raw);
|
|
if (Object.keys(asObj).length > 0) return asObj;
|
|
const sessionId = readNonEmptyString((raw as Record<string, unknown> | null)?.sessionId);
|
|
if (sessionId) return { sessionId };
|
|
return null;
|
|
},
|
|
serialize(params: Record<string, unknown> | null) {
|
|
if (!params || Object.keys(params).length === 0) return null;
|
|
return params;
|
|
},
|
|
getDisplayId(params: Record<string, unknown> | null) {
|
|
return readNonEmptyString(params?.sessionId);
|
|
},
|
|
};
|
|
|
|
function getAdapterSessionCodec(adapterType: string) {
|
|
const adapter = getServerAdapter(adapterType);
|
|
return adapter.sessionCodec ?? defaultSessionCodec;
|
|
}
|
|
|
|
function normalizeSessionParams(params: Record<string, unknown> | null | undefined) {
|
|
if (!params) return null;
|
|
return Object.keys(params).length > 0 ? params : null;
|
|
}
|
|
|
|
function resolveNextSessionState(input: {
|
|
codec: AdapterSessionCodec;
|
|
adapterResult: AdapterExecutionResult;
|
|
previousParams: Record<string, unknown> | null;
|
|
previousDisplayId: string | null;
|
|
previousLegacySessionId: string | null;
|
|
}) {
|
|
const { codec, adapterResult, previousParams, previousDisplayId, previousLegacySessionId } = input;
|
|
|
|
if (adapterResult.clearSession) {
|
|
return {
|
|
params: null as Record<string, unknown> | null,
|
|
displayId: null as string | null,
|
|
legacySessionId: null as string | null,
|
|
};
|
|
}
|
|
|
|
const explicitParams = adapterResult.sessionParams;
|
|
const hasExplicitParams = adapterResult.sessionParams !== undefined;
|
|
const hasExplicitSessionId = adapterResult.sessionId !== undefined;
|
|
const explicitSessionId = readNonEmptyString(adapterResult.sessionId);
|
|
const hasExplicitDisplay = adapterResult.sessionDisplayId !== undefined;
|
|
const explicitDisplayId = readNonEmptyString(adapterResult.sessionDisplayId);
|
|
const shouldUsePrevious = !hasExplicitParams && !hasExplicitSessionId && !hasExplicitDisplay;
|
|
|
|
const candidateParams =
|
|
hasExplicitParams
|
|
? explicitParams
|
|
: hasExplicitSessionId
|
|
? (explicitSessionId ? { sessionId: explicitSessionId } : null)
|
|
: previousParams;
|
|
|
|
const serialized = normalizeSessionParams(codec.serialize(normalizeSessionParams(candidateParams) ?? null));
|
|
const deserialized = normalizeSessionParams(codec.deserialize(serialized));
|
|
|
|
const displayId = truncateDisplayId(
|
|
explicitDisplayId ??
|
|
(codec.getDisplayId ? codec.getDisplayId(deserialized) : null) ??
|
|
readNonEmptyString(deserialized?.sessionId) ??
|
|
(shouldUsePrevious ? previousDisplayId : null) ??
|
|
explicitSessionId ??
|
|
(shouldUsePrevious ? previousLegacySessionId : null),
|
|
);
|
|
|
|
const legacySessionId =
|
|
explicitSessionId ??
|
|
readNonEmptyString(deserialized?.sessionId) ??
|
|
displayId ??
|
|
(shouldUsePrevious ? previousLegacySessionId : null);
|
|
|
|
return {
|
|
params: serialized,
|
|
displayId,
|
|
legacySessionId,
|
|
};
|
|
}
|
|
|
|
export function heartbeatService(db: Db) {
|
|
const runLogStore = getRunLogStore();
|
|
const secretsSvc = secretService(db);
|
|
|
|
async function getAgent(agentId: string) {
|
|
return db
|
|
.select()
|
|
.from(agents)
|
|
.where(eq(agents.id, agentId))
|
|
.then((rows) => rows[0] ?? null);
|
|
}
|
|
|
|
async function getRun(runId: string) {
|
|
return db
|
|
.select()
|
|
.from(heartbeatRuns)
|
|
.where(eq(heartbeatRuns.id, runId))
|
|
.then((rows) => rows[0] ?? null);
|
|
}
|
|
|
|
async function getRuntimeState(agentId: string) {
|
|
return db
|
|
.select()
|
|
.from(agentRuntimeState)
|
|
.where(eq(agentRuntimeState.agentId, agentId))
|
|
.then((rows) => rows[0] ?? null);
|
|
}
|
|
|
|
async function getTaskSession(
|
|
companyId: string,
|
|
agentId: string,
|
|
adapterType: string,
|
|
taskKey: string,
|
|
) {
|
|
return db
|
|
.select()
|
|
.from(agentTaskSessions)
|
|
.where(
|
|
and(
|
|
eq(agentTaskSessions.companyId, companyId),
|
|
eq(agentTaskSessions.agentId, agentId),
|
|
eq(agentTaskSessions.adapterType, adapterType),
|
|
eq(agentTaskSessions.taskKey, taskKey),
|
|
),
|
|
)
|
|
.then((rows) => rows[0] ?? null);
|
|
}
|
|
|
|
async function upsertTaskSession(input: {
|
|
companyId: string;
|
|
agentId: string;
|
|
adapterType: string;
|
|
taskKey: string;
|
|
sessionParamsJson: Record<string, unknown> | null;
|
|
sessionDisplayId: string | null;
|
|
lastRunId: string | null;
|
|
lastError: string | null;
|
|
}) {
|
|
const existing = await getTaskSession(
|
|
input.companyId,
|
|
input.agentId,
|
|
input.adapterType,
|
|
input.taskKey,
|
|
);
|
|
if (existing) {
|
|
return db
|
|
.update(agentTaskSessions)
|
|
.set({
|
|
sessionParamsJson: input.sessionParamsJson,
|
|
sessionDisplayId: input.sessionDisplayId,
|
|
lastRunId: input.lastRunId,
|
|
lastError: input.lastError,
|
|
updatedAt: new Date(),
|
|
})
|
|
.where(eq(agentTaskSessions.id, existing.id))
|
|
.returning()
|
|
.then((rows) => rows[0] ?? null);
|
|
}
|
|
|
|
return db
|
|
.insert(agentTaskSessions)
|
|
.values({
|
|
companyId: input.companyId,
|
|
agentId: input.agentId,
|
|
adapterType: input.adapterType,
|
|
taskKey: input.taskKey,
|
|
sessionParamsJson: input.sessionParamsJson,
|
|
sessionDisplayId: input.sessionDisplayId,
|
|
lastRunId: input.lastRunId,
|
|
lastError: input.lastError,
|
|
})
|
|
.returning()
|
|
.then((rows) => rows[0] ?? null);
|
|
}
|
|
|
|
async function clearTaskSessions(
|
|
companyId: string,
|
|
agentId: string,
|
|
opts?: { taskKey?: string | null; adapterType?: string | null },
|
|
) {
|
|
const conditions = [
|
|
eq(agentTaskSessions.companyId, companyId),
|
|
eq(agentTaskSessions.agentId, agentId),
|
|
];
|
|
if (opts?.taskKey) {
|
|
conditions.push(eq(agentTaskSessions.taskKey, opts.taskKey));
|
|
}
|
|
if (opts?.adapterType) {
|
|
conditions.push(eq(agentTaskSessions.adapterType, opts.adapterType));
|
|
}
|
|
|
|
return db
|
|
.delete(agentTaskSessions)
|
|
.where(and(...conditions))
|
|
.returning()
|
|
.then((rows) => rows.length);
|
|
}
|
|
|
|
async function ensureRuntimeState(agent: typeof agents.$inferSelect) {
|
|
const existing = await getRuntimeState(agent.id);
|
|
if (existing) return existing;
|
|
|
|
return db
|
|
.insert(agentRuntimeState)
|
|
.values({
|
|
agentId: agent.id,
|
|
companyId: agent.companyId,
|
|
adapterType: agent.adapterType,
|
|
stateJson: {},
|
|
})
|
|
.returning()
|
|
.then((rows) => rows[0]);
|
|
}
|
|
|
|
async function setRunStatus(
|
|
runId: string,
|
|
status: string,
|
|
patch?: Partial<typeof heartbeatRuns.$inferInsert>,
|
|
) {
|
|
const updated = await db
|
|
.update(heartbeatRuns)
|
|
.set({ status, ...patch, updatedAt: new Date() })
|
|
.where(eq(heartbeatRuns.id, runId))
|
|
.returning()
|
|
.then((rows) => rows[0] ?? null);
|
|
|
|
if (updated) {
|
|
publishLiveEvent({
|
|
companyId: updated.companyId,
|
|
type: "heartbeat.run.status",
|
|
payload: {
|
|
runId: updated.id,
|
|
agentId: updated.agentId,
|
|
status: updated.status,
|
|
invocationSource: updated.invocationSource,
|
|
triggerDetail: updated.triggerDetail,
|
|
error: updated.error ?? null,
|
|
errorCode: updated.errorCode ?? null,
|
|
startedAt: updated.startedAt ? new Date(updated.startedAt).toISOString() : null,
|
|
finishedAt: updated.finishedAt ? new Date(updated.finishedAt).toISOString() : null,
|
|
},
|
|
});
|
|
}
|
|
|
|
return updated;
|
|
}
|
|
|
|
async function setWakeupStatus(
|
|
wakeupRequestId: string | null | undefined,
|
|
status: string,
|
|
patch?: Partial<typeof agentWakeupRequests.$inferInsert>,
|
|
) {
|
|
if (!wakeupRequestId) return;
|
|
await db
|
|
.update(agentWakeupRequests)
|
|
.set({ status, ...patch, updatedAt: new Date() })
|
|
.where(eq(agentWakeupRequests.id, wakeupRequestId));
|
|
}
|
|
|
|
async function appendRunEvent(
|
|
run: typeof heartbeatRuns.$inferSelect,
|
|
seq: number,
|
|
event: {
|
|
eventType: string;
|
|
stream?: "system" | "stdout" | "stderr";
|
|
level?: "info" | "warn" | "error";
|
|
color?: string;
|
|
message?: string;
|
|
payload?: Record<string, unknown>;
|
|
},
|
|
) {
|
|
await db.insert(heartbeatRunEvents).values({
|
|
companyId: run.companyId,
|
|
runId: run.id,
|
|
agentId: run.agentId,
|
|
seq,
|
|
eventType: event.eventType,
|
|
stream: event.stream,
|
|
level: event.level,
|
|
color: event.color,
|
|
message: event.message,
|
|
payload: event.payload,
|
|
});
|
|
|
|
publishLiveEvent({
|
|
companyId: run.companyId,
|
|
type: "heartbeat.run.event",
|
|
payload: {
|
|
runId: run.id,
|
|
agentId: run.agentId,
|
|
seq,
|
|
eventType: event.eventType,
|
|
stream: event.stream ?? null,
|
|
level: event.level ?? null,
|
|
color: event.color ?? null,
|
|
message: event.message ?? null,
|
|
payload: event.payload ?? null,
|
|
},
|
|
});
|
|
}
|
|
|
|
function parseHeartbeatPolicy(agent: typeof agents.$inferSelect) {
|
|
const runtimeConfig = parseObject(agent.runtimeConfig);
|
|
const heartbeat = parseObject(runtimeConfig.heartbeat);
|
|
|
|
return {
|
|
enabled: asBoolean(heartbeat.enabled, true),
|
|
intervalSec: Math.max(0, asNumber(heartbeat.intervalSec, 0)),
|
|
wakeOnDemand: asBoolean(heartbeat.wakeOnDemand ?? heartbeat.wakeOnAssignment ?? heartbeat.wakeOnOnDemand ?? heartbeat.wakeOnAutomation, true),
|
|
};
|
|
}
|
|
|
|
async function finalizeAgentStatus(
|
|
agentId: string,
|
|
outcome: "succeeded" | "failed" | "cancelled" | "timed_out",
|
|
) {
|
|
const existing = await getAgent(agentId);
|
|
if (!existing) return;
|
|
|
|
if (existing.status === "paused" || existing.status === "terminated") {
|
|
return;
|
|
}
|
|
|
|
const nextStatus =
|
|
outcome === "succeeded" ? "idle" : outcome === "cancelled" ? "idle" : "error";
|
|
|
|
const updated = await db
|
|
.update(agents)
|
|
.set({
|
|
status: nextStatus,
|
|
lastHeartbeatAt: new Date(),
|
|
updatedAt: new Date(),
|
|
})
|
|
.where(eq(agents.id, agentId))
|
|
.returning()
|
|
.then((rows) => rows[0] ?? null);
|
|
|
|
if (updated) {
|
|
publishLiveEvent({
|
|
companyId: updated.companyId,
|
|
type: "agent.status",
|
|
payload: {
|
|
agentId: updated.id,
|
|
status: updated.status,
|
|
lastHeartbeatAt: updated.lastHeartbeatAt
|
|
? new Date(updated.lastHeartbeatAt).toISOString()
|
|
: null,
|
|
outcome,
|
|
},
|
|
});
|
|
}
|
|
}
|
|
|
|
async function reapOrphanedRuns(opts?: { staleThresholdMs?: number }) {
|
|
const staleThresholdMs = opts?.staleThresholdMs ?? 0;
|
|
const now = new Date();
|
|
|
|
// Find all runs in "queued" or "running" state
|
|
const activeRuns = await db
|
|
.select()
|
|
.from(heartbeatRuns)
|
|
.where(inArray(heartbeatRuns.status, ["queued", "running"]));
|
|
|
|
const reaped: string[] = [];
|
|
|
|
for (const run of activeRuns) {
|
|
if (runningProcesses.has(run.id)) continue;
|
|
|
|
// Apply staleness threshold to avoid false positives
|
|
if (staleThresholdMs > 0) {
|
|
const refTime = run.updatedAt ? new Date(run.updatedAt).getTime() : 0;
|
|
if (now.getTime() - refTime < staleThresholdMs) continue;
|
|
}
|
|
|
|
await setRunStatus(run.id, "failed", {
|
|
error: "Process lost -- server may have restarted",
|
|
errorCode: "process_lost",
|
|
finishedAt: now,
|
|
});
|
|
await setWakeupStatus(run.wakeupRequestId, "failed", {
|
|
finishedAt: now,
|
|
error: "Process lost -- server may have restarted",
|
|
});
|
|
const updatedRun = await getRun(run.id);
|
|
if (updatedRun) {
|
|
await appendRunEvent(updatedRun, 1, {
|
|
eventType: "lifecycle",
|
|
stream: "system",
|
|
level: "error",
|
|
message: "Process lost -- server may have restarted",
|
|
});
|
|
}
|
|
await finalizeAgentStatus(run.agentId, "failed");
|
|
await startNextQueuedRunForAgent(run.agentId);
|
|
runningProcesses.delete(run.id);
|
|
reaped.push(run.id);
|
|
}
|
|
|
|
if (reaped.length > 0) {
|
|
logger.warn({ reapedCount: reaped.length, runIds: reaped }, "reaped orphaned heartbeat runs");
|
|
}
|
|
return { reaped: reaped.length, runIds: reaped };
|
|
}
|
|
|
|
async function updateRuntimeState(
|
|
agent: typeof agents.$inferSelect,
|
|
run: typeof heartbeatRuns.$inferSelect,
|
|
result: AdapterExecutionResult,
|
|
session: { legacySessionId: string | null },
|
|
) {
|
|
const existing = await ensureRuntimeState(agent);
|
|
const usage = result.usage;
|
|
const inputTokens = usage?.inputTokens ?? 0;
|
|
const outputTokens = usage?.outputTokens ?? 0;
|
|
const cachedInputTokens = usage?.cachedInputTokens ?? 0;
|
|
const additionalCostCents = Math.max(0, Math.round((result.costUsd ?? 0) * 100));
|
|
|
|
await db
|
|
.update(agentRuntimeState)
|
|
.set({
|
|
adapterType: agent.adapterType,
|
|
sessionId: session.legacySessionId,
|
|
lastRunId: run.id,
|
|
lastRunStatus: run.status,
|
|
lastError: result.errorMessage ?? null,
|
|
totalInputTokens: existing.totalInputTokens + inputTokens,
|
|
totalOutputTokens: existing.totalOutputTokens + outputTokens,
|
|
totalCachedInputTokens: existing.totalCachedInputTokens + cachedInputTokens,
|
|
totalCostCents: existing.totalCostCents + additionalCostCents,
|
|
updatedAt: new Date(),
|
|
})
|
|
.where(eq(agentRuntimeState.agentId, agent.id));
|
|
|
|
if (additionalCostCents > 0) {
|
|
await db.insert(costEvents).values({
|
|
companyId: agent.companyId,
|
|
agentId: agent.id,
|
|
provider: result.provider ?? "unknown",
|
|
model: result.model ?? "unknown",
|
|
inputTokens,
|
|
outputTokens,
|
|
costCents: additionalCostCents,
|
|
occurredAt: new Date(),
|
|
});
|
|
|
|
await db
|
|
.update(agents)
|
|
.set({
|
|
spentMonthlyCents: sql`${agents.spentMonthlyCents} + ${additionalCostCents}`,
|
|
updatedAt: new Date(),
|
|
})
|
|
.where(eq(agents.id, agent.id));
|
|
}
|
|
}
|
|
|
|
async function startNextQueuedRunForAgent(agentId: string) {
|
|
const running = await db
|
|
.select({ id: heartbeatRuns.id })
|
|
.from(heartbeatRuns)
|
|
.where(and(eq(heartbeatRuns.agentId, agentId), eq(heartbeatRuns.status, "running")))
|
|
.limit(1)
|
|
.then((rows) => rows[0] ?? null);
|
|
if (running) return null;
|
|
|
|
const nextQueued = await db
|
|
.select()
|
|
.from(heartbeatRuns)
|
|
.where(and(eq(heartbeatRuns.agentId, agentId), eq(heartbeatRuns.status, "queued")))
|
|
.orderBy(asc(heartbeatRuns.createdAt))
|
|
.limit(1)
|
|
.then((rows) => rows[0] ?? null);
|
|
if (!nextQueued) return null;
|
|
|
|
void executeRun(nextQueued.id).catch((err) => {
|
|
logger.error({ err, runId: nextQueued.id }, "queued heartbeat execution failed");
|
|
});
|
|
return nextQueued;
|
|
}
|
|
|
|
async function executeRun(runId: string) {
|
|
const run = await getRun(runId);
|
|
if (!run) return;
|
|
if (run.status !== "queued" && run.status !== "running") return;
|
|
|
|
const agent = await getAgent(run.agentId);
|
|
if (!agent) {
|
|
await setRunStatus(runId, "failed", {
|
|
error: "Agent not found",
|
|
errorCode: "agent_not_found",
|
|
finishedAt: new Date(),
|
|
});
|
|
await setWakeupStatus(run.wakeupRequestId, "failed", {
|
|
finishedAt: new Date(),
|
|
error: "Agent not found",
|
|
});
|
|
return;
|
|
}
|
|
|
|
if (run.status === "queued") {
|
|
const activeForAgent = await db
|
|
.select()
|
|
.from(heartbeatRuns)
|
|
.where(and(eq(heartbeatRuns.agentId, run.agentId), inArray(heartbeatRuns.status, ["queued", "running"])))
|
|
.orderBy(asc(heartbeatRuns.createdAt));
|
|
const runningOther = activeForAgent.some((candidate) => candidate.status === "running" && candidate.id !== run.id);
|
|
const first = activeForAgent[0] ?? null;
|
|
if (runningOther || (first && first.id !== run.id)) {
|
|
return;
|
|
}
|
|
}
|
|
|
|
const runtime = await ensureRuntimeState(agent);
|
|
const context = parseObject(run.contextSnapshot);
|
|
const taskKey = deriveTaskKey(context, null);
|
|
const sessionCodec = getAdapterSessionCodec(agent.adapterType);
|
|
const taskSession = taskKey
|
|
? await getTaskSession(agent.companyId, agent.id, agent.adapterType, taskKey)
|
|
: null;
|
|
const previousSessionParams = normalizeSessionParams(
|
|
sessionCodec.deserialize(taskSession?.sessionParamsJson ?? null),
|
|
);
|
|
const previousSessionDisplayId = truncateDisplayId(
|
|
taskSession?.sessionDisplayId ??
|
|
(sessionCodec.getDisplayId ? sessionCodec.getDisplayId(previousSessionParams) : null) ??
|
|
readNonEmptyString(previousSessionParams?.sessionId) ??
|
|
runtime.sessionId,
|
|
);
|
|
const runtimeForAdapter = {
|
|
sessionId: readNonEmptyString(previousSessionParams?.sessionId) ?? runtime.sessionId,
|
|
sessionParams: previousSessionParams,
|
|
sessionDisplayId: previousSessionDisplayId,
|
|
taskKey,
|
|
};
|
|
|
|
let seq = 1;
|
|
let handle: RunLogHandle | null = null;
|
|
let stdoutExcerpt = "";
|
|
let stderrExcerpt = "";
|
|
|
|
try {
|
|
await setRunStatus(runId, "running", {
|
|
startedAt: new Date(),
|
|
sessionIdBefore: runtimeForAdapter.sessionDisplayId ?? runtimeForAdapter.sessionId,
|
|
});
|
|
await setWakeupStatus(run.wakeupRequestId, "claimed", { claimedAt: new Date() });
|
|
|
|
const runningAgent = await db
|
|
.update(agents)
|
|
.set({ status: "running", updatedAt: new Date() })
|
|
.where(eq(agents.id, agent.id))
|
|
.returning()
|
|
.then((rows) => rows[0] ?? null);
|
|
|
|
if (runningAgent) {
|
|
publishLiveEvent({
|
|
companyId: runningAgent.companyId,
|
|
type: "agent.status",
|
|
payload: {
|
|
agentId: runningAgent.id,
|
|
status: runningAgent.status,
|
|
outcome: "running",
|
|
},
|
|
});
|
|
}
|
|
|
|
const currentRun = (await getRun(runId)) ?? run;
|
|
await appendRunEvent(currentRun, seq++, {
|
|
eventType: "lifecycle",
|
|
stream: "system",
|
|
level: "info",
|
|
message: "run started",
|
|
});
|
|
|
|
handle = await runLogStore.begin({
|
|
companyId: run.companyId,
|
|
agentId: run.agentId,
|
|
runId,
|
|
});
|
|
|
|
await db
|
|
.update(heartbeatRuns)
|
|
.set({
|
|
logStore: handle.store,
|
|
logRef: handle.logRef,
|
|
updatedAt: new Date(),
|
|
})
|
|
.where(eq(heartbeatRuns.id, runId));
|
|
|
|
const onLog = async (stream: "stdout" | "stderr", chunk: string) => {
|
|
if (stream === "stdout") stdoutExcerpt = appendExcerpt(stdoutExcerpt, chunk);
|
|
if (stream === "stderr") stderrExcerpt = appendExcerpt(stderrExcerpt, chunk);
|
|
|
|
if (handle) {
|
|
await runLogStore.append(handle, {
|
|
stream,
|
|
chunk,
|
|
ts: new Date().toISOString(),
|
|
});
|
|
}
|
|
|
|
const payloadChunk =
|
|
chunk.length > MAX_LIVE_LOG_CHUNK_BYTES
|
|
? chunk.slice(chunk.length - MAX_LIVE_LOG_CHUNK_BYTES)
|
|
: chunk;
|
|
|
|
publishLiveEvent({
|
|
companyId: run.companyId,
|
|
type: "heartbeat.run.log",
|
|
payload: {
|
|
runId: run.id,
|
|
agentId: run.agentId,
|
|
stream,
|
|
chunk: payloadChunk,
|
|
truncated: payloadChunk.length !== chunk.length,
|
|
},
|
|
});
|
|
};
|
|
|
|
const config = parseObject(agent.adapterConfig);
|
|
const resolvedConfig = await secretsSvc.resolveAdapterConfigForRuntime(
|
|
agent.companyId,
|
|
config,
|
|
);
|
|
const onAdapterMeta = async (meta: AdapterInvocationMeta) => {
|
|
await appendRunEvent(currentRun, seq++, {
|
|
eventType: "adapter.invoke",
|
|
stream: "system",
|
|
level: "info",
|
|
message: "adapter invocation",
|
|
payload: meta as unknown as Record<string, unknown>,
|
|
});
|
|
};
|
|
|
|
const adapter = getServerAdapter(agent.adapterType);
|
|
const authToken = adapter.supportsLocalAgentJwt
|
|
? createLocalAgentJwt(agent.id, agent.companyId, agent.adapterType, run.id)
|
|
: null;
|
|
if (adapter.supportsLocalAgentJwt && !authToken) {
|
|
logger.warn(
|
|
{
|
|
companyId: agent.companyId,
|
|
agentId: agent.id,
|
|
runId: run.id,
|
|
adapterType: agent.adapterType,
|
|
},
|
|
"local agent jwt secret missing or invalid; running without injected PAPERCLIP_API_KEY",
|
|
);
|
|
}
|
|
const adapterResult = await adapter.execute({
|
|
runId: run.id,
|
|
agent,
|
|
runtime: runtimeForAdapter,
|
|
config: resolvedConfig,
|
|
context,
|
|
onLog,
|
|
onMeta: onAdapterMeta,
|
|
authToken: authToken ?? undefined,
|
|
});
|
|
const nextSessionState = resolveNextSessionState({
|
|
codec: sessionCodec,
|
|
adapterResult,
|
|
previousParams: previousSessionParams,
|
|
previousDisplayId: runtimeForAdapter.sessionDisplayId,
|
|
previousLegacySessionId: runtimeForAdapter.sessionId,
|
|
});
|
|
|
|
let outcome: "succeeded" | "failed" | "cancelled" | "timed_out";
|
|
const latestRun = await getRun(run.id);
|
|
if (latestRun?.status === "cancelled") {
|
|
outcome = "cancelled";
|
|
} else if (adapterResult.timedOut) {
|
|
outcome = "timed_out";
|
|
} else if ((adapterResult.exitCode ?? 0) === 0 && !adapterResult.errorMessage) {
|
|
outcome = "succeeded";
|
|
} else {
|
|
outcome = "failed";
|
|
}
|
|
|
|
let logSummary: { bytes: number; sha256?: string; compressed: boolean } | null = null;
|
|
if (handle) {
|
|
logSummary = await runLogStore.finalize(handle);
|
|
}
|
|
|
|
const status =
|
|
outcome === "succeeded"
|
|
? "succeeded"
|
|
: outcome === "cancelled"
|
|
? "cancelled"
|
|
: outcome === "timed_out"
|
|
? "timed_out"
|
|
: "failed";
|
|
|
|
const usageJson =
|
|
adapterResult.usage || adapterResult.costUsd != null
|
|
? ({
|
|
...(adapterResult.usage ?? {}),
|
|
...(adapterResult.costUsd != null ? { costUsd: adapterResult.costUsd } : {}),
|
|
} as Record<string, unknown>)
|
|
: null;
|
|
|
|
await setRunStatus(run.id, status, {
|
|
finishedAt: new Date(),
|
|
error:
|
|
outcome === "succeeded"
|
|
? null
|
|
: adapterResult.errorMessage ?? (outcome === "timed_out" ? "Timed out" : "Adapter failed"),
|
|
errorCode:
|
|
outcome === "timed_out"
|
|
? "timeout"
|
|
: outcome === "cancelled"
|
|
? "cancelled"
|
|
: outcome === "failed"
|
|
? "adapter_failed"
|
|
: null,
|
|
exitCode: adapterResult.exitCode,
|
|
signal: adapterResult.signal,
|
|
usageJson,
|
|
resultJson: adapterResult.resultJson ?? null,
|
|
sessionIdAfter: nextSessionState.displayId ?? nextSessionState.legacySessionId,
|
|
stdoutExcerpt,
|
|
stderrExcerpt,
|
|
logBytes: logSummary?.bytes,
|
|
logSha256: logSummary?.sha256,
|
|
logCompressed: logSummary?.compressed ?? false,
|
|
});
|
|
|
|
await setWakeupStatus(run.wakeupRequestId, outcome === "succeeded" ? "completed" : status, {
|
|
finishedAt: new Date(),
|
|
error: adapterResult.errorMessage ?? null,
|
|
});
|
|
|
|
const finalizedRun = await getRun(run.id);
|
|
if (finalizedRun) {
|
|
await appendRunEvent(finalizedRun, seq++, {
|
|
eventType: "lifecycle",
|
|
stream: "system",
|
|
level: outcome === "succeeded" ? "info" : "error",
|
|
message: `run ${outcome}`,
|
|
payload: {
|
|
status,
|
|
exitCode: adapterResult.exitCode,
|
|
},
|
|
});
|
|
}
|
|
|
|
if (finalizedRun) {
|
|
await updateRuntimeState(agent, finalizedRun, adapterResult, {
|
|
legacySessionId: nextSessionState.legacySessionId,
|
|
});
|
|
if (taskKey) {
|
|
if (adapterResult.clearSession || (!nextSessionState.params && !nextSessionState.displayId)) {
|
|
await clearTaskSessions(agent.companyId, agent.id, {
|
|
taskKey,
|
|
adapterType: agent.adapterType,
|
|
});
|
|
} else {
|
|
await upsertTaskSession({
|
|
companyId: agent.companyId,
|
|
agentId: agent.id,
|
|
adapterType: agent.adapterType,
|
|
taskKey,
|
|
sessionParamsJson: nextSessionState.params,
|
|
sessionDisplayId: nextSessionState.displayId,
|
|
lastRunId: finalizedRun.id,
|
|
lastError: outcome === "succeeded" ? null : (adapterResult.errorMessage ?? "run_failed"),
|
|
});
|
|
}
|
|
}
|
|
}
|
|
await finalizeAgentStatus(agent.id, outcome);
|
|
} catch (err) {
|
|
const message = err instanceof Error ? err.message : "Unknown adapter failure";
|
|
logger.error({ err, runId }, "heartbeat execution failed");
|
|
|
|
let logSummary: { bytes: number; sha256?: string; compressed: boolean } | null = null;
|
|
if (handle) {
|
|
try {
|
|
logSummary = await runLogStore.finalize(handle);
|
|
} catch (finalizeErr) {
|
|
logger.warn({ err: finalizeErr, runId }, "failed to finalize run log after error");
|
|
}
|
|
}
|
|
|
|
const failedRun = await setRunStatus(run.id, "failed", {
|
|
error: message,
|
|
errorCode: "adapter_failed",
|
|
finishedAt: new Date(),
|
|
stdoutExcerpt,
|
|
stderrExcerpt,
|
|
logBytes: logSummary?.bytes,
|
|
logSha256: logSummary?.sha256,
|
|
logCompressed: logSummary?.compressed ?? false,
|
|
});
|
|
await setWakeupStatus(run.wakeupRequestId, "failed", {
|
|
finishedAt: new Date(),
|
|
error: message,
|
|
});
|
|
|
|
if (failedRun) {
|
|
await appendRunEvent(failedRun, seq++, {
|
|
eventType: "error",
|
|
stream: "system",
|
|
level: "error",
|
|
message,
|
|
});
|
|
|
|
await updateRuntimeState(agent, failedRun, {
|
|
exitCode: null,
|
|
signal: null,
|
|
timedOut: false,
|
|
errorMessage: message,
|
|
}, {
|
|
legacySessionId: runtimeForAdapter.sessionId,
|
|
});
|
|
|
|
if (taskKey && (previousSessionParams || previousSessionDisplayId || taskSession)) {
|
|
await upsertTaskSession({
|
|
companyId: agent.companyId,
|
|
agentId: agent.id,
|
|
adapterType: agent.adapterType,
|
|
taskKey,
|
|
sessionParamsJson: previousSessionParams,
|
|
sessionDisplayId: previousSessionDisplayId,
|
|
lastRunId: failedRun.id,
|
|
lastError: message,
|
|
});
|
|
}
|
|
}
|
|
|
|
await finalizeAgentStatus(agent.id, "failed");
|
|
} finally {
|
|
await startNextQueuedRunForAgent(agent.id);
|
|
}
|
|
}
|
|
|
|
async function enqueueWakeup(agentId: string, opts: WakeupOptions = {}) {
|
|
const source = opts.source ?? "on_demand";
|
|
const triggerDetail = opts.triggerDetail ?? null;
|
|
const contextSnapshot: Record<string, unknown> = { ...(opts.contextSnapshot ?? {}) };
|
|
const reason = opts.reason ?? null;
|
|
const payload = opts.payload ?? null;
|
|
const issueIdFromPayload = readNonEmptyString(payload?.["issueId"]);
|
|
const taskKey = deriveTaskKey(contextSnapshot, payload);
|
|
|
|
if (!readNonEmptyString(contextSnapshot["wakeReason"]) && reason) {
|
|
contextSnapshot.wakeReason = reason;
|
|
}
|
|
if (!readNonEmptyString(contextSnapshot["issueId"]) && issueIdFromPayload) {
|
|
contextSnapshot.issueId = issueIdFromPayload;
|
|
}
|
|
if (!readNonEmptyString(contextSnapshot["taskId"]) && issueIdFromPayload) {
|
|
contextSnapshot.taskId = issueIdFromPayload;
|
|
}
|
|
if (!readNonEmptyString(contextSnapshot["taskKey"]) && taskKey) {
|
|
contextSnapshot.taskKey = taskKey;
|
|
}
|
|
if (!readNonEmptyString(contextSnapshot["wakeSource"])) {
|
|
contextSnapshot.wakeSource = source;
|
|
}
|
|
if (!readNonEmptyString(contextSnapshot["wakeTriggerDetail"]) && triggerDetail) {
|
|
contextSnapshot.wakeTriggerDetail = triggerDetail;
|
|
}
|
|
|
|
const agent = await getAgent(agentId);
|
|
if (!agent) throw notFound("Agent not found");
|
|
|
|
if (
|
|
agent.status === "paused" ||
|
|
agent.status === "terminated" ||
|
|
agent.status === "pending_approval"
|
|
) {
|
|
throw conflict("Agent is not invokable in its current state", { status: agent.status });
|
|
}
|
|
|
|
const policy = parseHeartbeatPolicy(agent);
|
|
const writeSkippedRequest = async (reason: string) => {
|
|
await db.insert(agentWakeupRequests).values({
|
|
companyId: agent.companyId,
|
|
agentId,
|
|
source,
|
|
triggerDetail,
|
|
reason,
|
|
payload,
|
|
status: "skipped",
|
|
requestedByActorType: opts.requestedByActorType ?? null,
|
|
requestedByActorId: opts.requestedByActorId ?? null,
|
|
idempotencyKey: opts.idempotencyKey ?? null,
|
|
finishedAt: new Date(),
|
|
});
|
|
};
|
|
|
|
if (source === "timer" && !policy.enabled) {
|
|
await writeSkippedRequest("heartbeat.disabled");
|
|
return null;
|
|
}
|
|
if (source !== "timer" && !policy.wakeOnDemand) {
|
|
await writeSkippedRequest("heartbeat.wakeOnDemand.disabled");
|
|
return null;
|
|
}
|
|
|
|
const activeRuns = await db
|
|
.select()
|
|
.from(heartbeatRuns)
|
|
.where(and(eq(heartbeatRuns.agentId, agentId), inArray(heartbeatRuns.status, ["queued", "running"])))
|
|
.orderBy(desc(heartbeatRuns.createdAt));
|
|
|
|
const sameScopeRun = activeRuns.find((candidate) =>
|
|
isSameTaskScope(runTaskKey(candidate), taskKey),
|
|
);
|
|
|
|
if (sameScopeRun) {
|
|
await db.insert(agentWakeupRequests).values({
|
|
companyId: agent.companyId,
|
|
agentId,
|
|
source,
|
|
triggerDetail,
|
|
reason,
|
|
payload,
|
|
status: "coalesced",
|
|
coalescedCount: 1,
|
|
requestedByActorType: opts.requestedByActorType ?? null,
|
|
requestedByActorId: opts.requestedByActorId ?? null,
|
|
idempotencyKey: opts.idempotencyKey ?? null,
|
|
runId: sameScopeRun.id,
|
|
finishedAt: new Date(),
|
|
});
|
|
return sameScopeRun;
|
|
}
|
|
|
|
const wakeupRequest = await db
|
|
.insert(agentWakeupRequests)
|
|
.values({
|
|
companyId: agent.companyId,
|
|
agentId,
|
|
source,
|
|
triggerDetail,
|
|
reason,
|
|
payload,
|
|
status: "queued",
|
|
requestedByActorType: opts.requestedByActorType ?? null,
|
|
requestedByActorId: opts.requestedByActorId ?? null,
|
|
idempotencyKey: opts.idempotencyKey ?? null,
|
|
})
|
|
.returning()
|
|
.then((rows) => rows[0]);
|
|
|
|
let sessionBefore: string | null = null;
|
|
if (taskKey) {
|
|
const codec = getAdapterSessionCodec(agent.adapterType);
|
|
const existingTaskSession = await getTaskSession(
|
|
agent.companyId,
|
|
agent.id,
|
|
agent.adapterType,
|
|
taskKey,
|
|
);
|
|
const parsedParams = normalizeSessionParams(
|
|
codec.deserialize(existingTaskSession?.sessionParamsJson ?? null),
|
|
);
|
|
sessionBefore = truncateDisplayId(
|
|
existingTaskSession?.sessionDisplayId ??
|
|
(codec.getDisplayId ? codec.getDisplayId(parsedParams) : null) ??
|
|
readNonEmptyString(parsedParams?.sessionId),
|
|
);
|
|
} else {
|
|
const runtimeForRun = await getRuntimeState(agent.id);
|
|
sessionBefore = runtimeForRun?.sessionId ?? null;
|
|
}
|
|
|
|
const newRun = await db
|
|
.insert(heartbeatRuns)
|
|
.values({
|
|
companyId: agent.companyId,
|
|
agentId,
|
|
invocationSource: source,
|
|
triggerDetail,
|
|
status: "queued",
|
|
wakeupRequestId: wakeupRequest.id,
|
|
contextSnapshot,
|
|
sessionIdBefore: sessionBefore,
|
|
})
|
|
.returning()
|
|
.then((rows) => rows[0]);
|
|
|
|
await db
|
|
.update(agentWakeupRequests)
|
|
.set({
|
|
runId: newRun.id,
|
|
updatedAt: new Date(),
|
|
})
|
|
.where(eq(agentWakeupRequests.id, wakeupRequest.id));
|
|
|
|
publishLiveEvent({
|
|
companyId: newRun.companyId,
|
|
type: "heartbeat.run.queued",
|
|
payload: {
|
|
runId: newRun.id,
|
|
agentId: newRun.agentId,
|
|
invocationSource: newRun.invocationSource,
|
|
triggerDetail: newRun.triggerDetail,
|
|
wakeupRequestId: newRun.wakeupRequestId,
|
|
},
|
|
});
|
|
|
|
await startNextQueuedRunForAgent(agent.id);
|
|
|
|
return newRun;
|
|
}
|
|
|
|
return {
|
|
list: (companyId: string, agentId?: string) => {
|
|
if (!agentId) {
|
|
return db
|
|
.select()
|
|
.from(heartbeatRuns)
|
|
.where(eq(heartbeatRuns.companyId, companyId))
|
|
.orderBy(desc(heartbeatRuns.createdAt));
|
|
}
|
|
|
|
return db
|
|
.select()
|
|
.from(heartbeatRuns)
|
|
.where(and(eq(heartbeatRuns.companyId, companyId), eq(heartbeatRuns.agentId, agentId)))
|
|
.orderBy(desc(heartbeatRuns.createdAt));
|
|
},
|
|
|
|
getRun,
|
|
|
|
getRuntimeState: async (agentId: string) => {
|
|
const state = await getRuntimeState(agentId);
|
|
const agent = await getAgent(agentId);
|
|
if (!agent) return null;
|
|
const ensured = state ?? (await ensureRuntimeState(agent));
|
|
const latestTaskSession = await db
|
|
.select()
|
|
.from(agentTaskSessions)
|
|
.where(and(eq(agentTaskSessions.companyId, agent.companyId), eq(agentTaskSessions.agentId, agent.id)))
|
|
.orderBy(desc(agentTaskSessions.updatedAt))
|
|
.limit(1)
|
|
.then((rows) => rows[0] ?? null);
|
|
return {
|
|
...ensured,
|
|
sessionDisplayId: latestTaskSession?.sessionDisplayId ?? ensured.sessionId,
|
|
sessionParamsJson: latestTaskSession?.sessionParamsJson ?? null,
|
|
};
|
|
},
|
|
|
|
listTaskSessions: async (agentId: string) => {
|
|
const agent = await getAgent(agentId);
|
|
if (!agent) throw notFound("Agent not found");
|
|
|
|
return db
|
|
.select()
|
|
.from(agentTaskSessions)
|
|
.where(and(eq(agentTaskSessions.companyId, agent.companyId), eq(agentTaskSessions.agentId, agentId)))
|
|
.orderBy(desc(agentTaskSessions.updatedAt), desc(agentTaskSessions.createdAt));
|
|
},
|
|
|
|
resetRuntimeSession: async (agentId: string, opts?: { taskKey?: string | null }) => {
|
|
const agent = await getAgent(agentId);
|
|
if (!agent) throw notFound("Agent not found");
|
|
await ensureRuntimeState(agent);
|
|
const taskKey = readNonEmptyString(opts?.taskKey);
|
|
const clearedTaskSessions = await clearTaskSessions(
|
|
agent.companyId,
|
|
agent.id,
|
|
taskKey ? { taskKey, adapterType: agent.adapterType } : undefined,
|
|
);
|
|
const runtimePatch: Partial<typeof agentRuntimeState.$inferInsert> = {
|
|
sessionId: null,
|
|
lastError: null,
|
|
updatedAt: new Date(),
|
|
};
|
|
if (!taskKey) {
|
|
runtimePatch.stateJson = {};
|
|
}
|
|
|
|
const updated = await db
|
|
.update(agentRuntimeState)
|
|
.set(runtimePatch)
|
|
.where(eq(agentRuntimeState.agentId, agentId))
|
|
.returning()
|
|
.then((rows) => rows[0] ?? null);
|
|
|
|
if (!updated) return null;
|
|
return {
|
|
...updated,
|
|
sessionDisplayId: null,
|
|
sessionParamsJson: null,
|
|
clearedTaskSessions,
|
|
};
|
|
},
|
|
|
|
listEvents: (runId: string, afterSeq = 0, limit = 200) =>
|
|
db
|
|
.select()
|
|
.from(heartbeatRunEvents)
|
|
.where(and(eq(heartbeatRunEvents.runId, runId), gt(heartbeatRunEvents.seq, afterSeq)))
|
|
.orderBy(asc(heartbeatRunEvents.seq))
|
|
.limit(Math.max(1, Math.min(limit, 1000))),
|
|
|
|
readLog: async (runId: string, opts?: { offset?: number; limitBytes?: number }) => {
|
|
const run = await getRun(runId);
|
|
if (!run) throw notFound("Heartbeat run not found");
|
|
if (!run.logStore || !run.logRef) throw notFound("Run log not found");
|
|
|
|
const result = await runLogStore.read(
|
|
{
|
|
store: run.logStore as "local_file",
|
|
logRef: run.logRef,
|
|
},
|
|
opts,
|
|
);
|
|
|
|
return {
|
|
runId,
|
|
store: run.logStore,
|
|
logRef: run.logRef,
|
|
...result,
|
|
};
|
|
},
|
|
|
|
invoke: async (
|
|
agentId: string,
|
|
source: "timer" | "assignment" | "on_demand" | "automation" = "on_demand",
|
|
contextSnapshot: Record<string, unknown> = {},
|
|
triggerDetail: "manual" | "ping" | "callback" | "system" = "manual",
|
|
actor?: { actorType?: "user" | "agent" | "system"; actorId?: string | null },
|
|
) =>
|
|
enqueueWakeup(agentId, {
|
|
source,
|
|
triggerDetail,
|
|
contextSnapshot,
|
|
requestedByActorType: actor?.actorType,
|
|
requestedByActorId: actor?.actorId ?? null,
|
|
}),
|
|
|
|
wakeup: enqueueWakeup,
|
|
|
|
reapOrphanedRuns,
|
|
|
|
tickTimers: async (now = new Date()) => {
|
|
const allAgents = await db.select().from(agents);
|
|
let checked = 0;
|
|
let enqueued = 0;
|
|
let skipped = 0;
|
|
|
|
for (const agent of allAgents) {
|
|
if (agent.status === "paused" || agent.status === "terminated") continue;
|
|
const policy = parseHeartbeatPolicy(agent);
|
|
if (!policy.enabled || policy.intervalSec <= 0) continue;
|
|
|
|
checked += 1;
|
|
const last = agent.lastHeartbeatAt ? new Date(agent.lastHeartbeatAt).getTime() : 0;
|
|
const elapsedMs = now.getTime() - last;
|
|
if (last && elapsedMs < policy.intervalSec * 1000) continue;
|
|
|
|
const run = await enqueueWakeup(agent.id, {
|
|
source: "timer",
|
|
triggerDetail: "system",
|
|
reason: "heartbeat_timer",
|
|
requestedByActorType: "system",
|
|
requestedByActorId: "heartbeat_scheduler",
|
|
contextSnapshot: {
|
|
source: "scheduler",
|
|
reason: "interval_elapsed",
|
|
now: now.toISOString(),
|
|
},
|
|
});
|
|
if (run) enqueued += 1;
|
|
else skipped += 1;
|
|
}
|
|
|
|
return { checked, enqueued, skipped };
|
|
},
|
|
|
|
cancelRun: async (runId: string) => {
|
|
const run = await getRun(runId);
|
|
if (!run) throw notFound("Heartbeat run not found");
|
|
if (run.status !== "running" && run.status !== "queued") return run;
|
|
|
|
const running = runningProcesses.get(run.id);
|
|
if (running) {
|
|
running.child.kill("SIGTERM");
|
|
const graceMs = Math.max(1, running.graceSec) * 1000;
|
|
setTimeout(() => {
|
|
if (!running.child.killed) {
|
|
running.child.kill("SIGKILL");
|
|
}
|
|
}, graceMs);
|
|
}
|
|
|
|
const cancelled = await setRunStatus(run.id, "cancelled", {
|
|
finishedAt: new Date(),
|
|
error: "Cancelled by control plane",
|
|
errorCode: "cancelled",
|
|
});
|
|
|
|
await setWakeupStatus(run.wakeupRequestId, "cancelled", {
|
|
finishedAt: new Date(),
|
|
error: "Cancelled by control plane",
|
|
});
|
|
|
|
if (cancelled) {
|
|
await appendRunEvent(cancelled, 1, {
|
|
eventType: "lifecycle",
|
|
stream: "system",
|
|
level: "warn",
|
|
message: "run cancelled",
|
|
});
|
|
}
|
|
|
|
runningProcesses.delete(run.id);
|
|
await finalizeAgentStatus(run.agentId, "cancelled");
|
|
await startNextQueuedRunForAgent(run.agentId);
|
|
return cancelled;
|
|
},
|
|
|
|
cancelActiveForAgent: async (agentId: string) => {
|
|
const runs = await db
|
|
.select()
|
|
.from(heartbeatRuns)
|
|
.where(and(eq(heartbeatRuns.agentId, agentId), inArray(heartbeatRuns.status, ["queued", "running"])));
|
|
|
|
for (const run of runs) {
|
|
await setRunStatus(run.id, "cancelled", {
|
|
finishedAt: new Date(),
|
|
error: "Cancelled due to agent pause",
|
|
errorCode: "cancelled",
|
|
});
|
|
|
|
await setWakeupStatus(run.wakeupRequestId, "cancelled", {
|
|
finishedAt: new Date(),
|
|
error: "Cancelled due to agent pause",
|
|
});
|
|
|
|
const running = runningProcesses.get(run.id);
|
|
if (running) {
|
|
running.child.kill("SIGTERM");
|
|
runningProcesses.delete(run.id);
|
|
}
|
|
}
|
|
|
|
return runs.length;
|
|
},
|
|
|
|
getActiveRunForAgent: async (agentId: string) => {
|
|
const [run] = await db
|
|
.select()
|
|
.from(heartbeatRuns)
|
|
.where(
|
|
and(
|
|
eq(heartbeatRuns.agentId, agentId),
|
|
eq(heartbeatRuns.status, "running"),
|
|
),
|
|
)
|
|
.orderBy(desc(heartbeatRuns.startedAt))
|
|
.limit(1);
|
|
return run ?? null;
|
|
},
|
|
};
|
|
}
|