2026-02-25 08:38:58 -06:00
import fs from "node:fs/promises" ;
2026-03-05 06:14:32 -06:00
import path from "node:path" ;
2026-02-17 12:24:43 -06:00
import { and , asc , desc , eq , gt , inArray , sql } from "drizzle-orm" ;
2026-03-03 08:45:26 -06:00
import type { Db } from "@paperclipai/db" ;
2026-02-17 12:24:43 -06:00
import {
agents ,
agentRuntimeState ,
2026-02-19 14:02:17 -06:00
agentTaskSessions ,
2026-02-17 12:24:43 -06:00
agentWakeupRequests ,
heartbeatRunEvents ,
heartbeatRuns ,
2026-02-20 15:48:22 -06:00
issues ,
2026-03-10 09:03:31 -05:00
projects ,
2026-02-25 08:38:58 -06:00
projectWorkspaces ,
2026-03-03 08:45:26 -06:00
} from "@paperclipai/db" ;
Add server routes for companies, approvals, costs, and dashboard
New routes: companies, approvals, costs, dashboard, authz. New
services: companies, approvals, costs, dashboard, heartbeat,
activity-log. Add auth middleware and structured error handling.
Expand existing agent and issue routes with richer CRUD operations.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 09:07:27 -06:00
import { conflict , notFound } from "../errors.js" ;
import { logger } from "../middleware/logger.js" ;
2026-02-17 12:24:43 -06:00
import { publishLiveEvent } from "./live-events.js" ;
import { getRunLogStore , type RunLogHandle } from "./run-log-store.js" ;
2026-02-18 13:53:03 -06:00
import { getServerAdapter , runningProcesses } from "../adapters/index.js" ;
2026-03-13 08:49:11 -05:00
import type { AdapterExecutionResult , AdapterInvocationMeta , AdapterSessionCodec , UsageSummary } from "../adapters/index.js" ;
2026-02-18 16:46:45 -06:00
import { createLocalAgentJwt } from "../agent-auth-jwt.js" ;
2026-02-18 13:53:03 -06:00
import { parseObject , asBoolean , asNumber , appendWithCap , MAX_EXCERPT_BYTES } from "../adapters/utils.js" ;
2026-03-09 14:45:09 +10:00
import { costService } from "./costs.js" ;
2026-03-15 07:05:01 -05:00
import { companySkillService } from "./company-skills.js" ;
2026-02-19 15:43:52 -06:00
import { secretService } from "./secrets.js" ;
2026-02-25 08:38:58 -06:00
import { resolveDefaultAgentWorkspaceDir } from "../home-paths.js" ;
2026-03-11 17:23:33 -05:00
import { summarizeHeartbeatRunResultJson } from "./heartbeat-run-summary.js" ;
2026-03-10 10:58:38 -05:00
import {
buildWorkspaceReadyComment ,
ensureRuntimeServicesForRun ,
persistAdapterManagedRuntimeServices ,
realizeExecutionWorkspace ,
releaseRuntimeServicesForRun ,
} from "./workspace-runtime.js" ;
import { issueService } from "./issues.js" ;
2026-03-10 09:03:31 -05:00
import {
buildExecutionWorkspaceAdapterConfig ,
parseIssueExecutionWorkspaceSettings ,
parseProjectExecutionWorkspacePolicy ,
resolveExecutionWorkspaceMode ,
} from "./execution-workspace-policy.js" ;
2026-03-11 17:46:23 -05:00
import { redactCurrentUserText , redactCurrentUserValue } from "../log-redaction.js" ;
Add server routes for companies, approvals, costs, and dashboard
New routes: companies, approvals, costs, dashboard, authz. New
services: companies, approvals, costs, dashboard, heartbeat,
activity-log. Add auth middleware and structured error handling.
Expand existing agent and issue routes with richer CRUD operations.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 09:07:27 -06:00
2026-02-18 13:53:03 -06:00
const MAX_LIVE_LOG_CHUNK_BYTES = 8 * 1024 ;
2026-02-20 12:50:34 -06:00
const HEARTBEAT_MAX_CONCURRENT_RUNS_DEFAULT = 1 ;
const HEARTBEAT_MAX_CONCURRENT_RUNS_MAX = 10 ;
2026-02-20 15:48:22 -06:00
const DEFERRED_WAKE_CONTEXT_KEY = "_paperclipWakeContext" ;
2026-02-20 12:50:34 -06:00
const startLocksByAgent = new Map < string , Promise < void > > ( ) ;
2026-02-25 21:35:33 -06:00
const REPO_ONLY_CWD_SENTINEL = "/__paperclip_repo_only__" ;
2026-03-13 08:49:11 -05:00
const SESSIONED_LOCAL_ADAPTERS = new Set ( [
"claude_local" ,
"codex_local" ,
"cursor" ,
"gemini_local" ,
"opencode_local" ,
"pi_local" ,
] ) ;
2026-02-17 12:24:43 -06:00
2026-03-10 21:16:33 -05:00
const heartbeatRunListColumns = {
id : heartbeatRuns.id ,
companyId : heartbeatRuns.companyId ,
agentId : heartbeatRuns.agentId ,
invocationSource : heartbeatRuns.invocationSource ,
triggerDetail : heartbeatRuns.triggerDetail ,
status : heartbeatRuns.status ,
startedAt : heartbeatRuns.startedAt ,
finishedAt : heartbeatRuns.finishedAt ,
error : heartbeatRuns.error ,
wakeupRequestId : heartbeatRuns.wakeupRequestId ,
exitCode : heartbeatRuns.exitCode ,
signal : heartbeatRuns.signal ,
usageJson : heartbeatRuns.usageJson ,
2026-03-11 17:23:33 -05:00
resultJson : heartbeatRuns.resultJson ,
2026-03-10 21:16:33 -05:00
sessionIdBefore : heartbeatRuns.sessionIdBefore ,
sessionIdAfter : heartbeatRuns.sessionIdAfter ,
logStore : heartbeatRuns.logStore ,
logRef : heartbeatRuns.logRef ,
logBytes : heartbeatRuns.logBytes ,
logSha256 : heartbeatRuns.logSha256 ,
logCompressed : heartbeatRuns.logCompressed ,
stdoutExcerpt : sql < string | null > ` NULL ` . as ( "stdoutExcerpt" ) ,
stderrExcerpt : sql < string | null > ` NULL ` . as ( "stderrExcerpt" ) ,
errorCode : heartbeatRuns.errorCode ,
externalRunId : heartbeatRuns.externalRunId ,
contextSnapshot : heartbeatRuns.contextSnapshot ,
createdAt : heartbeatRuns.createdAt ,
updatedAt : heartbeatRuns.updatedAt ,
} as const ;
2026-02-18 13:53:03 -06:00
function appendExcerpt ( prev : string , chunk : string ) {
return appendWithCap ( prev , chunk , MAX_EXCERPT_BYTES ) ;
2026-02-18 13:02:17 -06:00
}
2026-02-20 12:50:34 -06:00
function normalizeMaxConcurrentRuns ( value : unknown ) {
const parsed = Math . floor ( asNumber ( value , HEARTBEAT_MAX_CONCURRENT_RUNS_DEFAULT ) ) ;
if ( ! Number . isFinite ( parsed ) ) return HEARTBEAT_MAX_CONCURRENT_RUNS_DEFAULT ;
return Math . max ( HEARTBEAT_MAX_CONCURRENT_RUNS_DEFAULT , Math . min ( HEARTBEAT_MAX_CONCURRENT_RUNS_MAX , parsed ) ) ;
}
async function withAgentStartLock < T > ( agentId : string , fn : ( ) = > Promise < T > ) {
const previous = startLocksByAgent . get ( agentId ) ? ? Promise . resolve ( ) ;
const run = previous . then ( fn ) ;
const marker = run . then (
( ) = > undefined ,
( ) = > undefined ,
) ;
startLocksByAgent . set ( agentId , marker ) ;
try {
return await run ;
} finally {
if ( startLocksByAgent . get ( agentId ) === marker ) {
startLocksByAgent . delete ( agentId ) ;
}
}
}
2026-02-17 12:24:43 -06:00
interface WakeupOptions {
source ? : "timer" | "assignment" | "on_demand" | "automation" ;
triggerDetail ? : "manual" | "ping" | "callback" | "system" ;
reason? : string | null ;
payload? : Record < string , unknown > | null ;
idempotencyKey? : string | null ;
requestedByActorType ? : "user" | "agent" | "system" ;
requestedByActorId? : string | null ;
contextSnapshot? : Record < string , unknown > ;
}
2026-03-13 08:49:11 -05:00
type UsageTotals = {
inputTokens : number ;
cachedInputTokens : number ;
outputTokens : number ;
} ;
type SessionCompactionPolicy = {
enabled : boolean ;
maxSessionRuns : number ;
maxRawInputTokens : number ;
maxSessionAgeHours : number ;
} ;
type SessionCompactionDecision = {
rotate : boolean ;
reason : string | null ;
handoffMarkdown : string | null ;
previousRunId : string | null ;
} ;
2026-02-26 10:32:44 -06:00
interface ParsedIssueAssigneeAdapterOverrides {
adapterConfig : Record < string , unknown > | null ;
useProjectWorkspace : boolean | null ;
}
2026-03-05 06:14:32 -06:00
export type ResolvedWorkspaceForRun = {
cwd : string ;
source : "project_primary" | "task_session" | "agent_home" ;
projectId : string | null ;
workspaceId : string | null ;
repoUrl : string | null ;
repoRef : string | null ;
workspaceHints : Array < {
workspaceId : string ;
cwd : string | null ;
repoUrl : string | null ;
repoRef : string | null ;
} > ;
warnings : string [ ] ;
} ;
2026-02-19 09:09:40 -06:00
function readNonEmptyString ( value : unknown ) : string | null {
return typeof value === "string" && value . trim ( ) . length > 0 ? value : null ;
}
2026-03-13 08:49:11 -05:00
function normalizeUsageTotals ( usage : UsageSummary | null | undefined ) : UsageTotals | null {
if ( ! usage ) return null ;
return {
inputTokens : Math.max ( 0 , Math . floor ( asNumber ( usage . inputTokens , 0 ) ) ) ,
cachedInputTokens : Math.max ( 0 , Math . floor ( asNumber ( usage . cachedInputTokens , 0 ) ) ) ,
outputTokens : Math.max ( 0 , Math . floor ( asNumber ( usage . outputTokens , 0 ) ) ) ,
} ;
}
function readRawUsageTotals ( usageJson : unknown ) : UsageTotals | null {
const parsed = parseObject ( usageJson ) ;
if ( Object . keys ( parsed ) . length === 0 ) return null ;
const inputTokens = Math . max (
0 ,
Math . floor ( asNumber ( parsed . rawInputTokens , asNumber ( parsed . inputTokens , 0 ) ) ) ,
) ;
const cachedInputTokens = Math . max (
0 ,
Math . floor ( asNumber ( parsed . rawCachedInputTokens , asNumber ( parsed . cachedInputTokens , 0 ) ) ) ,
) ;
const outputTokens = Math . max (
0 ,
Math . floor ( asNumber ( parsed . rawOutputTokens , asNumber ( parsed . outputTokens , 0 ) ) ) ,
) ;
if ( inputTokens <= 0 && cachedInputTokens <= 0 && outputTokens <= 0 ) {
return null ;
}
return {
inputTokens ,
cachedInputTokens ,
outputTokens ,
} ;
}
function deriveNormalizedUsageDelta ( current : UsageTotals | null , previous : UsageTotals | null ) : UsageTotals | null {
if ( ! current ) return null ;
if ( ! previous ) return { . . . current } ;
const inputTokens = current . inputTokens >= previous . inputTokens
? current . inputTokens - previous . inputTokens
: current . inputTokens ;
const cachedInputTokens = current . cachedInputTokens >= previous . cachedInputTokens
? current . cachedInputTokens - previous . cachedInputTokens
: current . cachedInputTokens ;
const outputTokens = current . outputTokens >= previous . outputTokens
? current . outputTokens - previous . outputTokens
: current . outputTokens ;
return {
inputTokens : Math.max ( 0 , inputTokens ) ,
cachedInputTokens : Math.max ( 0 , cachedInputTokens ) ,
outputTokens : Math.max ( 0 , outputTokens ) ,
} ;
}
function formatCount ( value : number | null | undefined ) {
if ( typeof value !== "number" || ! Number . isFinite ( value ) ) return "0" ;
return value . toLocaleString ( "en-US" ) ;
}
function parseSessionCompactionPolicy ( agent : typeof agents . $inferSelect ) : SessionCompactionPolicy {
const runtimeConfig = parseObject ( agent . runtimeConfig ) ;
const heartbeat = parseObject ( runtimeConfig . heartbeat ) ;
const compaction = parseObject (
heartbeat . sessionCompaction ? ? heartbeat . sessionRotation ? ? runtimeConfig . sessionCompaction ,
) ;
const supportsSessions = SESSIONED_LOCAL_ADAPTERS . has ( agent . adapterType ) ;
const enabled = compaction . enabled === undefined
? supportsSessions
: asBoolean ( compaction . enabled , supportsSessions ) ;
return {
enabled ,
maxSessionRuns : Math.max ( 0 , Math . floor ( asNumber ( compaction . maxSessionRuns , 200 ) ) ) ,
maxRawInputTokens : Math.max ( 0 , Math . floor ( asNumber ( compaction . maxRawInputTokens , 2 _000_000 ) ) ) ,
maxSessionAgeHours : Math.max ( 0 , Math . floor ( asNumber ( compaction . maxSessionAgeHours , 72 ) ) ) ,
} ;
}
2026-03-05 06:14:32 -06:00
export function resolveRuntimeSessionParamsForWorkspace ( input : {
agentId : string ;
previousSessionParams : Record < string , unknown > | null ;
resolvedWorkspace : ResolvedWorkspaceForRun ;
} ) {
const { agentId , previousSessionParams , resolvedWorkspace } = input ;
const previousSessionId = readNonEmptyString ( previousSessionParams ? . sessionId ) ;
const previousCwd = readNonEmptyString ( previousSessionParams ? . cwd ) ;
if ( ! previousSessionId || ! previousCwd ) {
return {
sessionParams : previousSessionParams ,
warning : null as string | null ,
} ;
}
if ( resolvedWorkspace . source !== "project_primary" ) {
return {
sessionParams : previousSessionParams ,
warning : null as string | null ,
} ;
}
const projectCwd = readNonEmptyString ( resolvedWorkspace . cwd ) ;
if ( ! projectCwd ) {
return {
sessionParams : previousSessionParams ,
warning : null as string | null ,
} ;
}
const fallbackAgentHomeCwd = resolveDefaultAgentWorkspaceDir ( agentId ) ;
if ( path . resolve ( previousCwd ) !== path . resolve ( fallbackAgentHomeCwd ) ) {
return {
sessionParams : previousSessionParams ,
warning : null as string | null ,
} ;
}
if ( path . resolve ( projectCwd ) === path . resolve ( previousCwd ) ) {
return {
sessionParams : previousSessionParams ,
warning : null as string | null ,
} ;
}
const previousWorkspaceId = readNonEmptyString ( previousSessionParams ? . workspaceId ) ;
if (
previousWorkspaceId &&
resolvedWorkspace . workspaceId &&
previousWorkspaceId !== resolvedWorkspace . workspaceId
) {
return {
sessionParams : previousSessionParams ,
warning : null as string | null ,
} ;
}
const migratedSessionParams : Record < string , unknown > = {
. . . ( previousSessionParams ? ? { } ) ,
cwd : projectCwd ,
} ;
if ( resolvedWorkspace . workspaceId ) migratedSessionParams . workspaceId = resolvedWorkspace . workspaceId ;
if ( resolvedWorkspace . repoUrl ) migratedSessionParams . repoUrl = resolvedWorkspace . repoUrl ;
if ( resolvedWorkspace . repoRef ) migratedSessionParams . repoRef = resolvedWorkspace . repoRef ;
return {
sessionParams : migratedSessionParams ,
warning :
` Project workspace " ${ projectCwd } " is now available. ` +
` Attempting to resume session " ${ previousSessionId } " that was previously saved in fallback workspace " ${ previousCwd } ". ` ,
} ;
}
2026-02-26 10:32:44 -06:00
function parseIssueAssigneeAdapterOverrides (
raw : unknown ,
) : ParsedIssueAssigneeAdapterOverrides | null {
const parsed = parseObject ( raw ) ;
const parsedAdapterConfig = parseObject ( parsed . adapterConfig ) ;
const adapterConfig =
Object . keys ( parsedAdapterConfig ) . length > 0 ? parsedAdapterConfig : null ;
const useProjectWorkspace =
typeof parsed . useProjectWorkspace === "boolean"
? parsed . useProjectWorkspace
: null ;
if ( ! adapterConfig && useProjectWorkspace === null ) return null ;
return {
adapterConfig ,
useProjectWorkspace ,
} ;
}
2026-02-19 14:02:17 -06:00
function deriveTaskKey (
contextSnapshot : Record < string , unknown > | null | undefined ,
payload : Record < string , unknown > | null | undefined ,
) {
return (
readNonEmptyString ( contextSnapshot ? . taskKey ) ? ?
readNonEmptyString ( contextSnapshot ? . taskId ) ? ?
readNonEmptyString ( contextSnapshot ? . issueId ) ? ?
readNonEmptyString ( payload ? . taskKey ) ? ?
readNonEmptyString ( payload ? . taskId ) ? ?
readNonEmptyString ( payload ? . issueId ) ? ?
null
) ;
}
2026-03-05 06:54:36 -06:00
export function shouldResetTaskSessionForWake (
contextSnapshot : Record < string , unknown > | null | undefined ,
) {
2026-03-13 08:49:11 -05:00
if ( contextSnapshot ? . forceFreshSession === true ) return true ;
2026-03-05 06:54:36 -06:00
const wakeReason = readNonEmptyString ( contextSnapshot ? . wakeReason ) ;
2026-03-05 09:48:11 -06:00
if ( wakeReason === "issue_assigned" ) return true ;
2026-03-13 08:49:11 -05:00
return false ;
2026-03-05 09:48:11 -06:00
}
function describeSessionResetReason (
contextSnapshot : Record < string , unknown > | null | undefined ,
) {
2026-03-13 08:49:11 -05:00
if ( contextSnapshot ? . forceFreshSession === true ) return "forceFreshSession was requested" ;
2026-03-05 09:48:11 -06:00
const wakeReason = readNonEmptyString ( contextSnapshot ? . wakeReason ) ;
if ( wakeReason === "issue_assigned" ) return "wake reason is issue_assigned" ;
return null ;
2026-03-05 06:54:36 -06:00
}
2026-02-20 10:32:17 -06:00
function deriveCommentId (
contextSnapshot : Record < string , unknown > | null | undefined ,
payload : Record < string , unknown > | null | undefined ,
) {
return (
readNonEmptyString ( contextSnapshot ? . wakeCommentId ) ? ?
readNonEmptyString ( contextSnapshot ? . commentId ) ? ?
readNonEmptyString ( payload ? . commentId ) ? ?
null
) ;
}
2026-02-20 15:48:22 -06:00
function enrichWakeContextSnapshot ( input : {
contextSnapshot : Record < string , unknown > ;
reason : string | null ;
source : WakeupOptions [ "source" ] ;
triggerDetail : WakeupOptions [ "triggerDetail" ] | null ;
payload : Record < string , unknown > | null ;
} ) {
const { contextSnapshot , reason , source , triggerDetail , payload } = input ;
const issueIdFromPayload = readNonEmptyString ( payload ? . [ "issueId" ] ) ;
const commentIdFromPayload = readNonEmptyString ( payload ? . [ "commentId" ] ) ;
const taskKey = deriveTaskKey ( contextSnapshot , payload ) ;
const wakeCommentId = deriveCommentId ( contextSnapshot , payload ) ;
if ( ! readNonEmptyString ( contextSnapshot [ "wakeReason" ] ) && reason ) {
contextSnapshot . wakeReason = reason ;
}
if ( ! readNonEmptyString ( contextSnapshot [ "issueId" ] ) && issueIdFromPayload ) {
contextSnapshot . issueId = issueIdFromPayload ;
}
if ( ! readNonEmptyString ( contextSnapshot [ "taskId" ] ) && issueIdFromPayload ) {
contextSnapshot . taskId = issueIdFromPayload ;
}
if ( ! readNonEmptyString ( contextSnapshot [ "taskKey" ] ) && taskKey ) {
contextSnapshot . taskKey = taskKey ;
}
if ( ! readNonEmptyString ( contextSnapshot [ "commentId" ] ) && commentIdFromPayload ) {
contextSnapshot . commentId = commentIdFromPayload ;
}
if ( ! readNonEmptyString ( contextSnapshot [ "wakeCommentId" ] ) && wakeCommentId ) {
contextSnapshot . wakeCommentId = wakeCommentId ;
}
if ( ! readNonEmptyString ( contextSnapshot [ "wakeSource" ] ) && source ) {
contextSnapshot . wakeSource = source ;
}
if ( ! readNonEmptyString ( contextSnapshot [ "wakeTriggerDetail" ] ) && triggerDetail ) {
contextSnapshot . wakeTriggerDetail = triggerDetail ;
}
return {
contextSnapshot ,
issueIdFromPayload ,
commentIdFromPayload ,
taskKey ,
wakeCommentId ,
} ;
}
2026-02-20 10:32:17 -06:00
function mergeCoalescedContextSnapshot (
existingRaw : unknown ,
incoming : Record < string , unknown > ,
) {
const existing = parseObject ( existingRaw ) ;
const merged : Record < string , unknown > = {
. . . existing ,
. . . incoming ,
} ;
const commentId = deriveCommentId ( incoming , null ) ;
if ( commentId ) {
merged . commentId = commentId ;
merged . wakeCommentId = commentId ;
}
return merged ;
}
2026-02-19 14:02:17 -06:00
function runTaskKey ( run : typeof heartbeatRuns . $inferSelect ) {
return deriveTaskKey ( run . contextSnapshot as Record < string , unknown > | null , null ) ;
}
function isSameTaskScope ( left : string | null , right : string | null ) {
return ( left ? ? null ) === ( right ? ? null ) ;
}
function truncateDisplayId ( value : string | null | undefined , max = 128 ) {
if ( ! value ) return null ;
return value . length > max ? value . slice ( 0 , max ) : value ;
}
2026-02-20 15:48:22 -06:00
function normalizeAgentNameKey ( value : string | null | undefined ) {
if ( typeof value !== "string" ) return null ;
const normalized = value . trim ( ) . toLowerCase ( ) ;
return normalized . length > 0 ? normalized : null ;
}
2026-02-19 14:02:17 -06:00
const defaultSessionCodec : AdapterSessionCodec = {
deserialize ( raw : unknown ) {
const asObj = parseObject ( raw ) ;
if ( Object . keys ( asObj ) . length > 0 ) return asObj ;
const sessionId = readNonEmptyString ( ( raw as Record < string , unknown > | null ) ? . sessionId ) ;
if ( sessionId ) return { sessionId } ;
return null ;
} ,
serialize ( params : Record < string , unknown > | null ) {
if ( ! params || Object . keys ( params ) . length === 0 ) return null ;
return params ;
} ,
getDisplayId ( params : Record < string , unknown > | null ) {
return readNonEmptyString ( params ? . sessionId ) ;
} ,
} ;
function getAdapterSessionCodec ( adapterType : string ) {
const adapter = getServerAdapter ( adapterType ) ;
return adapter . sessionCodec ? ? defaultSessionCodec ;
}
function normalizeSessionParams ( params : Record < string , unknown > | null | undefined ) {
if ( ! params ) return null ;
return Object . keys ( params ) . length > 0 ? params : null ;
}
function resolveNextSessionState ( input : {
codec : AdapterSessionCodec ;
adapterResult : AdapterExecutionResult ;
previousParams : Record < string , unknown > | null ;
previousDisplayId : string | null ;
previousLegacySessionId : string | null ;
} ) {
const { codec , adapterResult , previousParams , previousDisplayId , previousLegacySessionId } = input ;
if ( adapterResult . clearSession ) {
return {
params : null as Record < string , unknown > | null ,
displayId : null as string | null ,
legacySessionId : null as string | null ,
} ;
}
const explicitParams = adapterResult . sessionParams ;
const hasExplicitParams = adapterResult . sessionParams !== undefined ;
const hasExplicitSessionId = adapterResult . sessionId !== undefined ;
const explicitSessionId = readNonEmptyString ( adapterResult . sessionId ) ;
const hasExplicitDisplay = adapterResult . sessionDisplayId !== undefined ;
const explicitDisplayId = readNonEmptyString ( adapterResult . sessionDisplayId ) ;
const shouldUsePrevious = ! hasExplicitParams && ! hasExplicitSessionId && ! hasExplicitDisplay ;
const candidateParams =
hasExplicitParams
? explicitParams
: hasExplicitSessionId
? ( explicitSessionId ? { sessionId : explicitSessionId } : null )
: previousParams ;
const serialized = normalizeSessionParams ( codec . serialize ( normalizeSessionParams ( candidateParams ) ? ? null ) ) ;
const deserialized = normalizeSessionParams ( codec . deserialize ( serialized ) ) ;
const displayId = truncateDisplayId (
explicitDisplayId ? ?
( codec . getDisplayId ? codec . getDisplayId ( deserialized ) : null ) ? ?
readNonEmptyString ( deserialized ? . sessionId ) ? ?
( shouldUsePrevious ? previousDisplayId : null ) ? ?
explicitSessionId ? ?
( shouldUsePrevious ? previousLegacySessionId : null ) ,
) ;
const legacySessionId =
explicitSessionId ? ?
readNonEmptyString ( deserialized ? . sessionId ) ? ?
displayId ? ?
( shouldUsePrevious ? previousLegacySessionId : null ) ;
return {
params : serialized ,
displayId ,
legacySessionId ,
} ;
}
Add server routes for companies, approvals, costs, and dashboard
New routes: companies, approvals, costs, dashboard, authz. New
services: companies, approvals, costs, dashboard, heartbeat,
activity-log. Add auth middleware and structured error handling.
Expand existing agent and issue routes with richer CRUD operations.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 09:07:27 -06:00
export function heartbeatService ( db : Db ) {
2026-02-17 12:24:43 -06:00
const runLogStore = getRunLogStore ( ) ;
2026-02-19 15:43:52 -06:00
const secretsSvc = secretService ( db ) ;
2026-03-15 07:05:01 -05:00
const companySkills = companySkillService ( db ) ;
2026-03-10 10:58:38 -05:00
const issuesSvc = issueService ( db ) ;
2026-03-13 06:56:31 -05:00
const activeRunExecutions = new Set < string > ( ) ;
2026-02-17 12:24:43 -06:00
Add server routes for companies, approvals, costs, and dashboard
New routes: companies, approvals, costs, dashboard, authz. New
services: companies, approvals, costs, dashboard, heartbeat,
activity-log. Add auth middleware and structured error handling.
Expand existing agent and issue routes with richer CRUD operations.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 09:07:27 -06:00
async function getAgent ( agentId : string ) {
return db
. select ( )
. from ( agents )
. where ( eq ( agents . id , agentId ) )
. then ( ( rows ) = > rows [ 0 ] ? ? null ) ;
}
2026-02-17 12:24:43 -06:00
async function getRun ( runId : string ) {
return db
. select ( )
. from ( heartbeatRuns )
. where ( eq ( heartbeatRuns . id , runId ) )
. then ( ( rows ) = > rows [ 0 ] ? ? null ) ;
}
async function getRuntimeState ( agentId : string ) {
return db
. select ( )
. from ( agentRuntimeState )
. where ( eq ( agentRuntimeState . agentId , agentId ) )
. then ( ( rows ) = > rows [ 0 ] ? ? null ) ;
}
2026-02-19 14:02:17 -06:00
async function getTaskSession (
companyId : string ,
agentId : string ,
adapterType : string ,
taskKey : string ,
) {
return db
. select ( )
. from ( agentTaskSessions )
. where (
and (
eq ( agentTaskSessions . companyId , companyId ) ,
eq ( agentTaskSessions . agentId , agentId ) ,
eq ( agentTaskSessions . adapterType , adapterType ) ,
eq ( agentTaskSessions . taskKey , taskKey ) ,
) ,
)
. then ( ( rows ) = > rows [ 0 ] ? ? null ) ;
}
2026-03-13 08:49:11 -05:00
async function getLatestRunForSession (
agentId : string ,
sessionId : string ,
opts ? : { excludeRunId? : string | null } ,
) {
const conditions = [
eq ( heartbeatRuns . agentId , agentId ) ,
eq ( heartbeatRuns . sessionIdAfter , sessionId ) ,
] ;
if ( opts ? . excludeRunId ) {
conditions . push ( sql ` ${ heartbeatRuns . id } <> ${ opts . excludeRunId } ` ) ;
}
return db
. select ( )
. from ( heartbeatRuns )
. where ( and ( . . . conditions ) )
. orderBy ( desc ( heartbeatRuns . createdAt ) )
. limit ( 1 )
. then ( ( rows ) = > rows [ 0 ] ? ? null ) ;
}
2026-03-13 10:18:00 -05:00
async function getOldestRunForSession ( agentId : string , sessionId : string ) {
return db
. select ( {
id : heartbeatRuns.id ,
createdAt : heartbeatRuns.createdAt ,
} )
. from ( heartbeatRuns )
. where ( and ( eq ( heartbeatRuns . agentId , agentId ) , eq ( heartbeatRuns . sessionIdAfter , sessionId ) ) )
. orderBy ( asc ( heartbeatRuns . createdAt ) , asc ( heartbeatRuns . id ) )
. limit ( 1 )
. then ( ( rows ) = > rows [ 0 ] ? ? null ) ;
}
2026-03-13 08:49:11 -05:00
async function resolveNormalizedUsageForSession ( input : {
agentId : string ;
runId : string ;
sessionId : string | null ;
rawUsage : UsageTotals | null ;
} ) {
const { agentId , runId , sessionId , rawUsage } = input ;
if ( ! sessionId || ! rawUsage ) {
return {
normalizedUsage : rawUsage ,
previousRawUsage : null as UsageTotals | null ,
derivedFromSessionTotals : false ,
} ;
}
const previousRun = await getLatestRunForSession ( agentId , sessionId , { excludeRunId : runId } ) ;
const previousRawUsage = readRawUsageTotals ( previousRun ? . usageJson ) ;
return {
normalizedUsage : deriveNormalizedUsageDelta ( rawUsage , previousRawUsage ) ,
previousRawUsage ,
derivedFromSessionTotals : previousRawUsage !== null ,
} ;
}
async function evaluateSessionCompaction ( input : {
agent : typeof agents . $inferSelect ;
sessionId : string | null ;
issueId : string | null ;
} ) : Promise < SessionCompactionDecision > {
const { agent , sessionId , issueId } = input ;
if ( ! sessionId ) {
return {
rotate : false ,
reason : null ,
handoffMarkdown : null ,
previousRunId : null ,
} ;
}
const policy = parseSessionCompactionPolicy ( agent ) ;
if ( ! policy . enabled ) {
return {
rotate : false ,
reason : null ,
handoffMarkdown : null ,
previousRunId : null ,
} ;
}
2026-03-13 10:18:00 -05:00
const fetchLimit = Math . max ( policy . maxSessionRuns > 0 ? policy . maxSessionRuns + 1 : 0 , 4 ) ;
2026-03-13 08:49:11 -05:00
const runs = await db
. select ( {
id : heartbeatRuns.id ,
createdAt : heartbeatRuns.createdAt ,
usageJson : heartbeatRuns.usageJson ,
resultJson : heartbeatRuns.resultJson ,
error : heartbeatRuns.error ,
} )
. from ( heartbeatRuns )
. where ( and ( eq ( heartbeatRuns . agentId , agent . id ) , eq ( heartbeatRuns . sessionIdAfter , sessionId ) ) )
. orderBy ( desc ( heartbeatRuns . createdAt ) )
2026-03-13 10:18:00 -05:00
. limit ( fetchLimit ) ;
2026-03-13 08:49:11 -05:00
if ( runs . length === 0 ) {
return {
rotate : false ,
reason : null ,
handoffMarkdown : null ,
previousRunId : null ,
} ;
}
const latestRun = runs [ 0 ] ? ? null ;
2026-03-13 10:18:00 -05:00
const oldestRun =
policy . maxSessionAgeHours > 0
? await getOldestRunForSession ( agent . id , sessionId )
: runs [ runs . length - 1 ] ? ? latestRun ;
2026-03-13 08:49:11 -05:00
const latestRawUsage = readRawUsageTotals ( latestRun ? . usageJson ) ;
const sessionAgeHours =
latestRun && oldestRun
? Math . max (
0 ,
( new Date ( latestRun . createdAt ) . getTime ( ) - new Date ( oldestRun . createdAt ) . getTime ( ) ) / ( 1000 * 60 * 60 ) ,
)
: 0 ;
let reason : string | null = null ;
if ( policy . maxSessionRuns > 0 && runs . length > policy . maxSessionRuns ) {
reason = ` session exceeded ${ policy . maxSessionRuns } runs ` ;
} else if (
policy . maxRawInputTokens > 0 &&
latestRawUsage &&
latestRawUsage . inputTokens >= policy . maxRawInputTokens
) {
reason =
` session raw input reached ${ formatCount ( latestRawUsage . inputTokens ) } tokens ` +
` (threshold ${ formatCount ( policy . maxRawInputTokens ) } ) ` ;
} else if ( policy . maxSessionAgeHours > 0 && sessionAgeHours >= policy . maxSessionAgeHours ) {
reason = ` session age reached ${ Math . floor ( sessionAgeHours ) } hours ` ;
}
if ( ! reason || ! latestRun ) {
return {
rotate : false ,
reason : null ,
handoffMarkdown : null ,
previousRunId : latestRun?.id ? ? null ,
} ;
}
const latestSummary = summarizeHeartbeatRunResultJson ( latestRun . resultJson ) ;
const latestTextSummary =
readNonEmptyString ( latestSummary ? . summary ) ? ?
readNonEmptyString ( latestSummary ? . result ) ? ?
readNonEmptyString ( latestSummary ? . message ) ? ?
readNonEmptyString ( latestRun . error ) ;
const handoffMarkdown = [
"Paperclip session handoff:" ,
` - Previous session: ${ sessionId } ` ,
issueId ? ` - Issue: ${ issueId } ` : "" ,
` - Rotation reason: ${ reason } ` ,
latestTextSummary ? ` - Last run summary: ${ latestTextSummary } ` : "" ,
"Continue from the current task state. Rebuild only the minimum context you need." ,
]
. filter ( Boolean )
. join ( "\n" ) ;
return {
rotate : true ,
reason ,
handoffMarkdown ,
previousRunId : latestRun.id ,
} ;
}
2026-02-20 15:48:22 -06:00
async function resolveSessionBeforeForWakeup (
agent : typeof agents . $inferSelect ,
taskKey : string | null ,
) {
if ( taskKey ) {
const codec = getAdapterSessionCodec ( agent . adapterType ) ;
const existingTaskSession = await getTaskSession (
agent . companyId ,
agent . id ,
agent . adapterType ,
taskKey ,
) ;
const parsedParams = normalizeSessionParams (
codec . deserialize ( existingTaskSession ? . sessionParamsJson ? ? null ) ,
) ;
return truncateDisplayId (
existingTaskSession ? . sessionDisplayId ? ?
( codec . getDisplayId ? codec . getDisplayId ( parsedParams ) : null ) ? ?
readNonEmptyString ( parsedParams ? . sessionId ) ,
) ;
}
const runtimeForRun = await getRuntimeState ( agent . id ) ;
return runtimeForRun ? . sessionId ? ? null ;
}
2026-02-25 08:38:58 -06:00
async function resolveWorkspaceForRun (
agent : typeof agents . $inferSelect ,
context : Record < string , unknown > ,
previousSessionParams : Record < string , unknown > | null ,
2026-02-26 10:32:44 -06:00
opts ? : { useProjectWorkspace? : boolean | null } ,
2026-03-05 06:14:32 -06:00
) : Promise < ResolvedWorkspaceForRun > {
2026-02-25 08:38:58 -06:00
const issueId = readNonEmptyString ( context . issueId ) ;
2026-02-25 21:35:33 -06:00
const contextProjectId = readNonEmptyString ( context . projectId ) ;
const issueProjectId = issueId
? await db
. select ( { projectId : issues.projectId } )
. from ( issues )
. where ( and ( eq ( issues . id , issueId ) , eq ( issues . companyId , agent . companyId ) ) )
. then ( ( rows ) = > rows [ 0 ] ? . projectId ? ? null )
: null ;
const resolvedProjectId = issueProjectId ? ? contextProjectId ;
2026-02-26 10:32:44 -06:00
const useProjectWorkspace = opts ? . useProjectWorkspace !== false ;
const workspaceProjectId = useProjectWorkspace ? resolvedProjectId : null ;
2026-02-25 21:35:33 -06:00
2026-02-26 10:32:44 -06:00
const projectWorkspaceRows = workspaceProjectId
2026-02-25 21:35:33 -06:00
? await db
2026-02-25 08:38:58 -06:00
. select ( )
. from ( projectWorkspaces )
. where (
and (
eq ( projectWorkspaces . companyId , agent . companyId ) ,
2026-02-26 10:32:44 -06:00
eq ( projectWorkspaces . projectId , workspaceProjectId ) ,
2026-02-25 08:38:58 -06:00
) ,
)
2026-02-25 21:35:33 -06:00
. orderBy ( asc ( projectWorkspaces . createdAt ) , asc ( projectWorkspaces . id ) )
: [ ] ;
const workspaceHints = projectWorkspaceRows . map ( ( workspace ) = > ( {
workspaceId : workspace.id ,
cwd : readNonEmptyString ( workspace . cwd ) ,
repoUrl : readNonEmptyString ( workspace . repoUrl ) ,
repoRef : readNonEmptyString ( workspace . repoRef ) ,
} ) ) ;
if ( projectWorkspaceRows . length > 0 ) {
2026-03-05 06:14:32 -06:00
const missingProjectCwds : string [ ] = [ ] ;
let hasConfiguredProjectCwd = false ;
2026-02-25 21:35:33 -06:00
for ( const workspace of projectWorkspaceRows ) {
const projectCwd = readNonEmptyString ( workspace . cwd ) ;
if ( ! projectCwd || projectCwd === REPO_ONLY_CWD_SENTINEL ) {
continue ;
}
2026-03-05 06:14:32 -06:00
hasConfiguredProjectCwd = true ;
2026-02-25 21:35:33 -06:00
const projectCwdExists = await fs
. stat ( projectCwd )
. then ( ( stats ) = > stats . isDirectory ( ) )
. catch ( ( ) = > false ) ;
if ( projectCwdExists ) {
2026-02-25 08:38:58 -06:00
return {
2026-02-25 21:35:33 -06:00
cwd : projectCwd ,
2026-02-25 08:38:58 -06:00
source : "project_primary" as const ,
2026-02-25 21:35:33 -06:00
projectId : resolvedProjectId ,
2026-02-25 08:38:58 -06:00
workspaceId : workspace.id ,
repoUrl : workspace.repoUrl ,
repoRef : workspace.repoRef ,
2026-02-25 21:35:33 -06:00
workspaceHints ,
2026-03-05 06:14:32 -06:00
warnings : [ ] ,
2026-02-25 08:38:58 -06:00
} ;
}
2026-03-05 06:14:32 -06:00
missingProjectCwds . push ( projectCwd ) ;
2026-02-25 08:38:58 -06:00
}
2026-02-25 21:35:33 -06:00
const fallbackCwd = resolveDefaultAgentWorkspaceDir ( agent . id ) ;
await fs . mkdir ( fallbackCwd , { recursive : true } ) ;
2026-03-05 06:14:32 -06:00
const warnings : string [ ] = [ ] ;
if ( missingProjectCwds . length > 0 ) {
const firstMissing = missingProjectCwds [ 0 ] ;
const extraMissingCount = Math . max ( 0 , missingProjectCwds . length - 1 ) ;
warnings . push (
extraMissingCount > 0
? ` Project workspace path " ${ firstMissing } " and ${ extraMissingCount } other configured path(s) are not available yet. Using fallback workspace " ${ fallbackCwd } " for this run. `
: ` Project workspace path " ${ firstMissing } " is not available yet. Using fallback workspace " ${ fallbackCwd } " for this run. ` ,
) ;
} else if ( ! hasConfiguredProjectCwd ) {
warnings . push (
` Project workspace has no local cwd configured. Using fallback workspace " ${ fallbackCwd } " for this run. ` ,
) ;
}
2026-02-25 21:35:33 -06:00
return {
cwd : fallbackCwd ,
source : "project_primary" as const ,
projectId : resolvedProjectId ,
workspaceId : projectWorkspaceRows [ 0 ] ? . id ? ? null ,
repoUrl : projectWorkspaceRows [ 0 ] ? . repoUrl ? ? null ,
repoRef : projectWorkspaceRows [ 0 ] ? . repoRef ? ? null ,
workspaceHints ,
2026-03-05 06:14:32 -06:00
warnings ,
2026-02-25 21:35:33 -06:00
} ;
}
const sessionCwd = readNonEmptyString ( previousSessionParams ? . cwd ) ;
if ( sessionCwd ) {
const sessionCwdExists = await fs
. stat ( sessionCwd )
. then ( ( stats ) = > stats . isDirectory ( ) )
. catch ( ( ) = > false ) ;
if ( sessionCwdExists ) {
return {
cwd : sessionCwd ,
source : "task_session" as const ,
projectId : resolvedProjectId ,
workspaceId : readNonEmptyString ( previousSessionParams ? . workspaceId ) ,
repoUrl : readNonEmptyString ( previousSessionParams ? . repoUrl ) ,
repoRef : readNonEmptyString ( previousSessionParams ? . repoRef ) ,
workspaceHints ,
2026-03-05 06:14:32 -06:00
warnings : [ ] ,
2026-02-25 21:35:33 -06:00
} ;
}
2026-02-25 08:38:58 -06:00
}
const cwd = resolveDefaultAgentWorkspaceDir ( agent . id ) ;
await fs . mkdir ( cwd , { recursive : true } ) ;
2026-03-05 06:14:32 -06:00
const warnings : string [ ] = [ ] ;
if ( sessionCwd ) {
warnings . push (
` Saved session workspace " ${ sessionCwd } " is not available. Using fallback workspace " ${ cwd } " for this run. ` ,
) ;
} else if ( resolvedProjectId ) {
warnings . push (
` No project workspace directory is currently available for this issue. Using fallback workspace " ${ cwd } " for this run. ` ,
) ;
} else {
warnings . push (
` No project or prior session workspace was available. Using fallback workspace " ${ cwd } " for this run. ` ,
) ;
}
2026-02-25 08:38:58 -06:00
return {
cwd ,
source : "agent_home" as const ,
2026-02-25 21:35:33 -06:00
projectId : resolvedProjectId ,
2026-02-25 08:38:58 -06:00
workspaceId : null ,
repoUrl : null ,
repoRef : null ,
2026-02-25 21:35:33 -06:00
workspaceHints ,
2026-03-05 06:14:32 -06:00
warnings ,
2026-02-25 08:38:58 -06:00
} ;
}
2026-02-19 14:02:17 -06:00
async function upsertTaskSession ( input : {
companyId : string ;
agentId : string ;
adapterType : string ;
taskKey : string ;
sessionParamsJson : Record < string , unknown > | null ;
sessionDisplayId : string | null ;
lastRunId : string | null ;
lastError : string | null ;
} ) {
const existing = await getTaskSession (
input . companyId ,
input . agentId ,
input . adapterType ,
input . taskKey ,
) ;
if ( existing ) {
return db
. update ( agentTaskSessions )
. set ( {
sessionParamsJson : input.sessionParamsJson ,
sessionDisplayId : input.sessionDisplayId ,
lastRunId : input.lastRunId ,
lastError : input.lastError ,
updatedAt : new Date ( ) ,
} )
. where ( eq ( agentTaskSessions . id , existing . id ) )
. returning ( )
. then ( ( rows ) = > rows [ 0 ] ? ? null ) ;
}
return db
. insert ( agentTaskSessions )
. values ( {
companyId : input.companyId ,
agentId : input.agentId ,
adapterType : input.adapterType ,
taskKey : input.taskKey ,
sessionParamsJson : input.sessionParamsJson ,
sessionDisplayId : input.sessionDisplayId ,
lastRunId : input.lastRunId ,
lastError : input.lastError ,
} )
. returning ( )
. then ( ( rows ) = > rows [ 0 ] ? ? null ) ;
}
async function clearTaskSessions (
companyId : string ,
agentId : string ,
opts ? : { taskKey? : string | null ; adapterType? : string | null } ,
) {
const conditions = [
eq ( agentTaskSessions . companyId , companyId ) ,
eq ( agentTaskSessions . agentId , agentId ) ,
] ;
if ( opts ? . taskKey ) {
conditions . push ( eq ( agentTaskSessions . taskKey , opts . taskKey ) ) ;
}
if ( opts ? . adapterType ) {
conditions . push ( eq ( agentTaskSessions . adapterType , opts . adapterType ) ) ;
}
return db
. delete ( agentTaskSessions )
. where ( and ( . . . conditions ) )
. returning ( )
. then ( ( rows ) = > rows . length ) ;
}
2026-02-17 12:24:43 -06:00
async function ensureRuntimeState ( agent : typeof agents . $inferSelect ) {
const existing = await getRuntimeState ( agent . id ) ;
if ( existing ) return existing ;
return db
. insert ( agentRuntimeState )
. values ( {
agentId : agent.id ,
companyId : agent.companyId ,
adapterType : agent.adapterType ,
stateJson : { } ,
} )
. returning ( )
. then ( ( rows ) = > rows [ 0 ] ) ;
}
Add server routes for companies, approvals, costs, and dashboard
New routes: companies, approvals, costs, dashboard, authz. New
services: companies, approvals, costs, dashboard, heartbeat,
activity-log. Add auth middleware and structured error handling.
Expand existing agent and issue routes with richer CRUD operations.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 09:07:27 -06:00
async function setRunStatus (
runId : string ,
status : string ,
patch? : Partial < typeof heartbeatRuns. $ inferInsert > ,
) {
2026-02-17 12:24:43 -06:00
const updated = await db
Add server routes for companies, approvals, costs, and dashboard
New routes: companies, approvals, costs, dashboard, authz. New
services: companies, approvals, costs, dashboard, heartbeat,
activity-log. Add auth middleware and structured error handling.
Expand existing agent and issue routes with richer CRUD operations.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 09:07:27 -06:00
. update ( heartbeatRuns )
. set ( { status , . . . patch , updatedAt : new Date ( ) } )
. where ( eq ( heartbeatRuns . id , runId ) )
. returning ( )
. then ( ( rows ) = > rows [ 0 ] ? ? null ) ;
2026-02-17 12:24:43 -06:00
if ( updated ) {
publishLiveEvent ( {
companyId : updated.companyId ,
type : "heartbeat.run.status" ,
payload : {
runId : updated.id ,
agentId : updated.agentId ,
status : updated.status ,
invocationSource : updated.invocationSource ,
triggerDetail : updated.triggerDetail ,
error : updated.error ? ? null ,
errorCode : updated.errorCode ? ? null ,
startedAt : updated.startedAt ? new Date ( updated . startedAt ) . toISOString ( ) : null ,
finishedAt : updated.finishedAt ? new Date ( updated . finishedAt ) . toISOString ( ) : null ,
} ,
} ) ;
}
return updated ;
Add server routes for companies, approvals, costs, and dashboard
New routes: companies, approvals, costs, dashboard, authz. New
services: companies, approvals, costs, dashboard, heartbeat,
activity-log. Add auth middleware and structured error handling.
Expand existing agent and issue routes with richer CRUD operations.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 09:07:27 -06:00
}
2026-02-17 12:24:43 -06:00
async function setWakeupStatus (
wakeupRequestId : string | null | undefined ,
status : string ,
patch? : Partial < typeof agentWakeupRequests. $ inferInsert > ,
) {
if ( ! wakeupRequestId ) return ;
await db
. update ( agentWakeupRequests )
. set ( { status , . . . patch , updatedAt : new Date ( ) } )
. where ( eq ( agentWakeupRequests . id , wakeupRequestId ) ) ;
}
async function appendRunEvent (
run : typeof heartbeatRuns . $inferSelect ,
seq : number ,
event : {
eventType : string ;
stream ? : "system" | "stdout" | "stderr" ;
level ? : "info" | "warn" | "error" ;
color? : string ;
message? : string ;
payload? : Record < string , unknown > ;
} ,
) {
2026-03-11 17:46:23 -05:00
const sanitizedMessage = event . message ? redactCurrentUserText ( event . message ) : event . message ;
const sanitizedPayload = event . payload ? redactCurrentUserValue ( event . payload ) : event . payload ;
2026-02-17 12:24:43 -06:00
await db . insert ( heartbeatRunEvents ) . values ( {
companyId : run.companyId ,
runId : run.id ,
agentId : run.agentId ,
seq ,
eventType : event.eventType ,
stream : event.stream ,
level : event.level ,
color : event.color ,
2026-03-11 17:46:23 -05:00
message : sanitizedMessage ,
payload : sanitizedPayload ,
2026-02-17 12:24:43 -06:00
} ) ;
publishLiveEvent ( {
companyId : run.companyId ,
type : "heartbeat.run.event" ,
payload : {
runId : run.id ,
agentId : run.agentId ,
seq ,
eventType : event.eventType ,
stream : event.stream ? ? null ,
level : event.level ? ? null ,
color : event.color ? ? null ,
2026-03-11 17:46:23 -05:00
message : sanitizedMessage ? ? null ,
payload : sanitizedPayload ? ? null ,
2026-02-17 12:24:43 -06:00
} ,
} ) ;
}
function parseHeartbeatPolicy ( agent : typeof agents . $inferSelect ) {
const runtimeConfig = parseObject ( agent . runtimeConfig ) ;
const heartbeat = parseObject ( runtimeConfig . heartbeat ) ;
return {
enabled : asBoolean ( heartbeat . enabled , true ) ,
intervalSec : Math.max ( 0 , asNumber ( heartbeat . intervalSec , 0 ) ) ,
2026-02-18 16:46:45 -06:00
wakeOnDemand : asBoolean ( heartbeat . wakeOnDemand ? ? heartbeat . wakeOnAssignment ? ? heartbeat . wakeOnOnDemand ? ? heartbeat . wakeOnAutomation , true ) ,
2026-02-20 12:50:34 -06:00
maxConcurrentRuns : normalizeMaxConcurrentRuns ( heartbeat . maxConcurrentRuns ) ,
2026-02-17 12:24:43 -06:00
} ;
}
2026-02-20 12:50:34 -06:00
async function countRunningRunsForAgent ( agentId : string ) {
const [ { count } ] = await db
. select ( { count : sql < number > ` count(*) ` } )
. from ( heartbeatRuns )
. where ( and ( eq ( heartbeatRuns . agentId , agentId ) , eq ( heartbeatRuns . status , "running" ) ) ) ;
return Number ( count ? ? 0 ) ;
}
2026-02-20 15:48:22 -06:00
async function claimQueuedRun ( run : typeof heartbeatRuns . $inferSelect ) {
if ( run . status !== "queued" ) return run ;
const claimedAt = new Date ( ) ;
const claimed = await db
. update ( heartbeatRuns )
. set ( {
status : "running" ,
startedAt : run.startedAt ? ? claimedAt ,
updatedAt : claimedAt ,
} )
. where ( and ( eq ( heartbeatRuns . id , run . id ) , eq ( heartbeatRuns . status , "queued" ) ) )
. returning ( )
. then ( ( rows ) = > rows [ 0 ] ? ? null ) ;
if ( ! claimed ) return null ;
publishLiveEvent ( {
companyId : claimed.companyId ,
type : "heartbeat.run.status" ,
payload : {
runId : claimed.id ,
agentId : claimed.agentId ,
status : claimed.status ,
invocationSource : claimed.invocationSource ,
triggerDetail : claimed.triggerDetail ,
error : claimed.error ? ? null ,
errorCode : claimed.errorCode ? ? null ,
startedAt : claimed.startedAt ? new Date ( claimed . startedAt ) . toISOString ( ) : null ,
finishedAt : claimed.finishedAt ? new Date ( claimed . finishedAt ) . toISOString ( ) : null ,
} ,
} ) ;
await setWakeupStatus ( claimed . wakeupRequestId , "claimed" , { claimedAt } ) ;
return claimed ;
}
2026-02-17 12:24:43 -06:00
async function finalizeAgentStatus (
agentId : string ,
outcome : "succeeded" | "failed" | "cancelled" | "timed_out" ,
) {
Add server routes for companies, approvals, costs, and dashboard
New routes: companies, approvals, costs, dashboard, authz. New
services: companies, approvals, costs, dashboard, heartbeat,
activity-log. Add auth middleware and structured error handling.
Expand existing agent and issue routes with richer CRUD operations.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 09:07:27 -06:00
const existing = await getAgent ( agentId ) ;
if ( ! existing ) return ;
if ( existing . status === "paused" || existing . status === "terminated" ) {
return ;
}
2026-02-20 12:50:34 -06:00
const runningCount = await countRunningRunsForAgent ( agentId ) ;
2026-02-17 12:24:43 -06:00
const nextStatus =
2026-02-20 12:50:34 -06:00
runningCount > 0
? "running"
: outcome === "succeeded" || outcome === "cancelled"
? "idle"
: "error" ;
2026-02-17 12:24:43 -06:00
const updated = await db
Add server routes for companies, approvals, costs, and dashboard
New routes: companies, approvals, costs, dashboard, authz. New
services: companies, approvals, costs, dashboard, heartbeat,
activity-log. Add auth middleware and structured error handling.
Expand existing agent and issue routes with richer CRUD operations.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 09:07:27 -06:00
. update ( agents )
. set ( {
2026-02-17 12:24:43 -06:00
status : nextStatus ,
Add server routes for companies, approvals, costs, and dashboard
New routes: companies, approvals, costs, dashboard, authz. New
services: companies, approvals, costs, dashboard, heartbeat,
activity-log. Add auth middleware and structured error handling.
Expand existing agent and issue routes with richer CRUD operations.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 09:07:27 -06:00
lastHeartbeatAt : new Date ( ) ,
updatedAt : new Date ( ) ,
} )
2026-02-17 12:24:43 -06:00
. where ( eq ( agents . id , agentId ) )
. returning ( )
. then ( ( rows ) = > rows [ 0 ] ? ? null ) ;
if ( updated ) {
publishLiveEvent ( {
companyId : updated.companyId ,
type : "agent.status" ,
payload : {
agentId : updated.id ,
status : updated.status ,
lastHeartbeatAt : updated.lastHeartbeatAt
? new Date ( updated . lastHeartbeatAt ) . toISOString ( )
: null ,
outcome ,
} ,
} ) ;
}
}
2026-02-19 09:09:40 -06:00
async function reapOrphanedRuns ( opts ? : { staleThresholdMs? : number } ) {
const staleThresholdMs = opts ? . staleThresholdMs ? ? 0 ;
const now = new Date ( ) ;
2026-03-14 13:02:21 -07:00
// Find all runs stuck in "running" state (queued runs are legitimately waiting; resumeQueuedRuns handles them)
2026-02-19 09:09:40 -06:00
const activeRuns = await db
. select ( )
. from ( heartbeatRuns )
2026-03-14 13:02:21 -07:00
. where ( eq ( heartbeatRuns . status , "running" ) ) ;
2026-02-19 09:09:40 -06:00
const reaped : string [ ] = [ ] ;
for ( const run of activeRuns ) {
2026-03-13 06:56:31 -05:00
if ( runningProcesses . has ( run . id ) || activeRunExecutions . has ( run . id ) ) continue ;
2026-02-19 09:09:40 -06:00
// Apply staleness threshold to avoid false positives
if ( staleThresholdMs > 0 ) {
const refTime = run . updatedAt ? new Date ( run . updatedAt ) . getTime ( ) : 0 ;
if ( now . getTime ( ) - refTime < staleThresholdMs ) continue ;
}
await setRunStatus ( run . id , "failed" , {
error : "Process lost -- server may have restarted" ,
errorCode : "process_lost" ,
finishedAt : now ,
} ) ;
await setWakeupStatus ( run . wakeupRequestId , "failed" , {
finishedAt : now ,
error : "Process lost -- server may have restarted" ,
} ) ;
const updatedRun = await getRun ( run . id ) ;
if ( updatedRun ) {
await appendRunEvent ( updatedRun , 1 , {
eventType : "lifecycle" ,
stream : "system" ,
level : "error" ,
message : "Process lost -- server may have restarted" ,
} ) ;
2026-02-20 15:48:22 -06:00
await releaseIssueExecutionAndPromote ( updatedRun ) ;
2026-02-19 09:09:40 -06:00
}
await finalizeAgentStatus ( run . agentId , "failed" ) ;
2026-02-19 14:02:17 -06:00
await startNextQueuedRunForAgent ( run . agentId ) ;
2026-02-19 09:09:40 -06:00
runningProcesses . delete ( run . id ) ;
reaped . push ( run . id ) ;
}
if ( reaped . length > 0 ) {
logger . warn ( { reapedCount : reaped.length , runIds : reaped } , "reaped orphaned heartbeat runs" ) ;
}
return { reaped : reaped.length , runIds : reaped } ;
}
2026-03-13 06:56:31 -05:00
async function resumeQueuedRuns() {
const queuedRuns = await db
. select ( { agentId : heartbeatRuns.agentId } )
. from ( heartbeatRuns )
. where ( eq ( heartbeatRuns . status , "queued" ) ) ;
const agentIds = [ . . . new Set ( queuedRuns . map ( ( r ) = > r . agentId ) ) ] ;
for ( const agentId of agentIds ) {
await startNextQueuedRunForAgent ( agentId ) ;
}
}
2026-02-17 12:24:43 -06:00
async function updateRuntimeState (
agent : typeof agents . $inferSelect ,
run : typeof heartbeatRuns . $inferSelect ,
result : AdapterExecutionResult ,
2026-02-19 14:02:17 -06:00
session : { legacySessionId : string | null } ,
2026-03-13 08:49:11 -05:00
normalizedUsage? : UsageTotals | null ,
2026-02-17 12:24:43 -06:00
) {
2026-02-20 12:50:34 -06:00
await ensureRuntimeState ( agent ) ;
2026-03-13 08:49:11 -05:00
const usage = normalizedUsage ? ? normalizeUsageTotals ( result . usage ) ;
2026-02-17 12:24:43 -06:00
const inputTokens = usage ? . inputTokens ? ? 0 ;
const outputTokens = usage ? . outputTokens ? ? 0 ;
const cachedInputTokens = usage ? . cachedInputTokens ? ? 0 ;
const additionalCostCents = Math . max ( 0 , Math . round ( ( result . costUsd ? ? 0 ) * 100 ) ) ;
2026-02-25 21:35:33 -06:00
const hasTokenUsage = inputTokens > 0 || outputTokens > 0 || cachedInputTokens > 0 ;
2026-02-17 12:24:43 -06:00
await db
. update ( agentRuntimeState )
. set ( {
adapterType : agent.adapterType ,
2026-02-19 14:02:17 -06:00
sessionId : session.legacySessionId ,
2026-02-17 12:24:43 -06:00
lastRunId : run.id ,
lastRunStatus : run.status ,
lastError : result.errorMessage ? ? null ,
2026-02-20 12:50:34 -06:00
totalInputTokens : sql ` ${ agentRuntimeState . totalInputTokens } + ${ inputTokens } ` ,
totalOutputTokens : sql ` ${ agentRuntimeState . totalOutputTokens } + ${ outputTokens } ` ,
totalCachedInputTokens : sql ` ${ agentRuntimeState . totalCachedInputTokens } + ${ cachedInputTokens } ` ,
totalCostCents : sql ` ${ agentRuntimeState . totalCostCents } + ${ additionalCostCents } ` ,
2026-02-17 12:24:43 -06:00
updatedAt : new Date ( ) ,
} )
. where ( eq ( agentRuntimeState . agentId , agent . id ) ) ;
2026-02-25 21:35:33 -06:00
if ( additionalCostCents > 0 || hasTokenUsage ) {
2026-03-09 14:45:09 +10:00
const costs = costService ( db ) ;
await costs . createEvent ( agent . companyId , {
2026-02-17 12:24:43 -06:00
agentId : agent.id ,
provider : result.provider ? ? "unknown" ,
model : result.model ? ? "unknown" ,
inputTokens ,
outputTokens ,
costCents : additionalCostCents ,
occurredAt : new Date ( ) ,
} ) ;
2026-02-25 21:35:33 -06:00
}
Add server routes for companies, approvals, costs, and dashboard
New routes: companies, approvals, costs, dashboard, authz. New
services: companies, approvals, costs, dashboard, heartbeat,
activity-log. Add auth middleware and structured error handling.
Expand existing agent and issue routes with richer CRUD operations.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 09:07:27 -06:00
}
2026-02-19 14:02:17 -06:00
async function startNextQueuedRunForAgent ( agentId : string ) {
2026-02-20 12:50:34 -06:00
return withAgentStartLock ( agentId , async ( ) = > {
const agent = await getAgent ( agentId ) ;
if ( ! agent ) return [ ] ;
const policy = parseHeartbeatPolicy ( agent ) ;
const runningCount = await countRunningRunsForAgent ( agentId ) ;
const availableSlots = Math . max ( 0 , policy . maxConcurrentRuns - runningCount ) ;
if ( availableSlots <= 0 ) return [ ] ;
const queuedRuns = await db
. select ( )
. from ( heartbeatRuns )
. where ( and ( eq ( heartbeatRuns . agentId , agentId ) , eq ( heartbeatRuns . status , "queued" ) ) )
. orderBy ( asc ( heartbeatRuns . createdAt ) )
. limit ( availableSlots ) ;
if ( queuedRuns . length === 0 ) return [ ] ;
2026-02-20 15:48:22 -06:00
const claimedRuns : Array < typeof heartbeatRuns. $ inferSelect > = [ ] ;
2026-02-20 12:50:34 -06:00
for ( const queuedRun of queuedRuns ) {
2026-02-20 15:48:22 -06:00
const claimed = await claimQueuedRun ( queuedRun ) ;
if ( claimed ) claimedRuns . push ( claimed ) ;
}
if ( claimedRuns . length === 0 ) return [ ] ;
for ( const claimedRun of claimedRuns ) {
void executeRun ( claimedRun . id ) . catch ( ( err ) = > {
logger . error ( { err , runId : claimedRun.id } , "queued heartbeat execution failed" ) ;
2026-02-20 12:50:34 -06:00
} ) ;
}
2026-02-20 15:48:22 -06:00
return claimedRuns ;
2026-02-19 14:02:17 -06:00
} ) ;
}
2026-02-17 12:24:43 -06:00
async function executeRun ( runId : string ) {
2026-02-20 12:50:34 -06:00
let run = await getRun ( runId ) ;
2026-02-17 12:24:43 -06:00
if ( ! run ) return ;
if ( run . status !== "queued" && run . status !== "running" ) return ;
2026-02-20 12:50:34 -06:00
if ( run . status === "queued" ) {
2026-02-20 15:48:22 -06:00
const claimed = await claimQueuedRun ( run ) ;
2026-02-20 12:50:34 -06:00
if ( ! claimed ) {
// Another worker has already claimed or finalized this run.
return ;
}
run = claimed ;
}
2026-03-07 12:37:15 -05:00
activeRunExecutions . add ( run . id ) ;
try {
Add server routes for companies, approvals, costs, and dashboard
New routes: companies, approvals, costs, dashboard, authz. New
services: companies, approvals, costs, dashboard, heartbeat,
activity-log. Add auth middleware and structured error handling.
Expand existing agent and issue routes with richer CRUD operations.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 09:07:27 -06:00
const agent = await getAgent ( run . agentId ) ;
if ( ! agent ) {
await setRunStatus ( runId , "failed" , {
error : "Agent not found" ,
2026-02-17 12:24:43 -06:00
errorCode : "agent_not_found" ,
Add server routes for companies, approvals, costs, and dashboard
New routes: companies, approvals, costs, dashboard, authz. New
services: companies, approvals, costs, dashboard, heartbeat,
activity-log. Add auth middleware and structured error handling.
Expand existing agent and issue routes with richer CRUD operations.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 09:07:27 -06:00
finishedAt : new Date ( ) ,
} ) ;
2026-02-17 12:24:43 -06:00
await setWakeupStatus ( run . wakeupRequestId , "failed" , {
finishedAt : new Date ( ) ,
error : "Agent not found" ,
} ) ;
2026-02-20 15:48:22 -06:00
const failedRun = await getRun ( runId ) ;
if ( failedRun ) await releaseIssueExecutionAndPromote ( failedRun ) ;
Add server routes for companies, approvals, costs, and dashboard
New routes: companies, approvals, costs, dashboard, authz. New
services: companies, approvals, costs, dashboard, heartbeat,
activity-log. Add auth middleware and structured error handling.
Expand existing agent and issue routes with richer CRUD operations.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 09:07:27 -06:00
return ;
}
2026-02-17 12:24:43 -06:00
const runtime = await ensureRuntimeState ( agent ) ;
2026-02-19 14:02:17 -06:00
const context = parseObject ( run . contextSnapshot ) ;
const taskKey = deriveTaskKey ( context , null ) ;
const sessionCodec = getAdapterSessionCodec ( agent . adapterType ) ;
2026-02-26 10:32:44 -06:00
const issueId = readNonEmptyString ( context . issueId ) ;
const issueAssigneeConfig = issueId
? await db
. select ( {
2026-03-10 09:03:31 -05:00
projectId : issues.projectId ,
2026-02-26 10:32:44 -06:00
assigneeAgentId : issues.assigneeAgentId ,
assigneeAdapterOverrides : issues.assigneeAdapterOverrides ,
2026-03-10 09:03:31 -05:00
executionWorkspaceSettings : issues.executionWorkspaceSettings ,
2026-02-26 10:32:44 -06:00
} )
. from ( issues )
. where ( and ( eq ( issues . id , issueId ) , eq ( issues . companyId , agent . companyId ) ) )
. then ( ( rows ) = > rows [ 0 ] ? ? null )
: null ;
const issueAssigneeOverrides =
issueAssigneeConfig && issueAssigneeConfig . assigneeAgentId === agent . id
? parseIssueAssigneeAdapterOverrides (
issueAssigneeConfig . assigneeAdapterOverrides ,
)
: null ;
2026-03-10 09:03:31 -05:00
const issueExecutionWorkspaceSettings = parseIssueExecutionWorkspaceSettings (
issueAssigneeConfig ? . executionWorkspaceSettings ,
) ;
const contextProjectId = readNonEmptyString ( context . projectId ) ;
const executionProjectId = issueAssigneeConfig ? . projectId ? ? contextProjectId ;
const projectExecutionWorkspacePolicy = executionProjectId
? await db
. select ( { executionWorkspacePolicy : projects.executionWorkspacePolicy } )
. from ( projects )
. where ( and ( eq ( projects . id , executionProjectId ) , eq ( projects . companyId , agent . companyId ) ) )
. then ( ( rows ) = > parseProjectExecutionWorkspacePolicy ( rows [ 0 ] ? . executionWorkspacePolicy ) )
: null ;
2026-02-19 14:02:17 -06:00
const taskSession = taskKey
? await getTaskSession ( agent . companyId , agent . id , agent . adapterType , taskKey )
: null ;
2026-03-05 06:54:36 -06:00
const resetTaskSession = shouldResetTaskSessionForWake ( context ) ;
2026-03-05 09:48:11 -06:00
const sessionResetReason = describeSessionResetReason ( context ) ;
2026-03-05 06:54:36 -06:00
const taskSessionForRun = resetTaskSession ? null : taskSession ;
2026-02-19 14:02:17 -06:00
const previousSessionParams = normalizeSessionParams (
2026-03-05 06:54:36 -06:00
sessionCodec . deserialize ( taskSessionForRun ? . sessionParamsJson ? ? null ) ,
2026-02-19 14:02:17 -06:00
) ;
2026-03-10 09:03:31 -05:00
const config = parseObject ( agent . adapterConfig ) ;
const executionWorkspaceMode = resolveExecutionWorkspaceMode ( {
projectPolicy : projectExecutionWorkspacePolicy ,
issueSettings : issueExecutionWorkspaceSettings ,
legacyUseProjectWorkspace : issueAssigneeOverrides?.useProjectWorkspace ? ? null ,
} ) ;
2026-02-26 10:32:44 -06:00
const resolvedWorkspace = await resolveWorkspaceForRun (
agent ,
context ,
previousSessionParams ,
2026-03-10 09:03:31 -05:00
{ useProjectWorkspace : executionWorkspaceMode !== "agent_default" } ,
2026-02-26 10:32:44 -06:00
) ;
2026-03-10 09:03:31 -05:00
const workspaceManagedConfig = buildExecutionWorkspaceAdapterConfig ( {
agentConfig : config ,
projectPolicy : projectExecutionWorkspacePolicy ,
issueSettings : issueExecutionWorkspaceSettings ,
mode : executionWorkspaceMode ,
legacyUseProjectWorkspace : issueAssigneeOverrides?.useProjectWorkspace ? ? null ,
} ) ;
2026-03-10 10:58:38 -05:00
const mergedConfig = issueAssigneeOverrides ? . adapterConfig
2026-03-10 09:03:31 -05:00
? { . . . workspaceManagedConfig , . . . issueAssigneeOverrides . adapterConfig }
: workspaceManagedConfig ;
2026-03-10 10:58:38 -05:00
const { config : resolvedConfig , secretKeys } = await secretsSvc . resolveAdapterConfigForRuntime (
agent . companyId ,
mergedConfig ,
) ;
2026-03-15 07:05:01 -05:00
const runtimeSkillEntries = await companySkills . listRuntimeSkillEntries ( agent . companyId ) ;
const runtimeConfig = {
. . . resolvedConfig ,
paperclipRuntimeSkills : runtimeSkillEntries ,
} ;
2026-03-10 10:58:38 -05:00
const issueRef = issueId
? await db
. select ( {
id : issues.id ,
identifier : issues.identifier ,
title : issues.title ,
} )
. from ( issues )
. where ( and ( eq ( issues . id , issueId ) , eq ( issues . companyId , agent . companyId ) ) )
. then ( ( rows ) = > rows [ 0 ] ? ? null )
: null ;
const executionWorkspace = await realizeExecutionWorkspace ( {
base : {
baseCwd : resolvedWorkspace.cwd ,
source : resolvedWorkspace.source ,
projectId : resolvedWorkspace.projectId ,
workspaceId : resolvedWorkspace.workspaceId ,
repoUrl : resolvedWorkspace.repoUrl ,
repoRef : resolvedWorkspace.repoRef ,
} ,
config : resolvedConfig ,
issue : issueRef ,
agent : {
id : agent.id ,
name : agent.name ,
companyId : agent.companyId ,
} ,
} ) ;
2026-03-05 06:14:32 -06:00
const runtimeSessionResolution = resolveRuntimeSessionParamsForWorkspace ( {
agentId : agent.id ,
previousSessionParams ,
2026-03-10 10:58:38 -05:00
resolvedWorkspace : {
. . . resolvedWorkspace ,
cwd : executionWorkspace.cwd ,
} ,
2026-03-05 06:14:32 -06:00
} ) ;
const runtimeSessionParams = runtimeSessionResolution . sessionParams ;
const runtimeWorkspaceWarnings = [
. . . resolvedWorkspace . warnings ,
2026-03-10 10:58:38 -05:00
. . . executionWorkspace . warnings ,
2026-03-05 06:14:32 -06:00
. . . ( runtimeSessionResolution . warning ? [ runtimeSessionResolution . warning ] : [ ] ) ,
2026-03-05 09:48:11 -06:00
. . . ( resetTaskSession && sessionResetReason
2026-03-05 06:54:36 -06:00
? [
2026-03-05 09:48:11 -06:00
taskKey
? ` Skipping saved session resume for task " ${ taskKey } " because ${ sessionResetReason } . `
: ` Skipping saved session resume because ${ sessionResetReason } . ` ,
2026-03-05 06:54:36 -06:00
]
: [ ] ) ,
2026-03-05 06:14:32 -06:00
] ;
2026-02-25 08:38:58 -06:00
context . paperclipWorkspace = {
2026-03-10 10:58:38 -05:00
cwd : executionWorkspace.cwd ,
source : executionWorkspace.source ,
2026-03-10 09:03:31 -05:00
mode : executionWorkspaceMode ,
2026-03-10 10:58:38 -05:00
strategy : executionWorkspace.strategy ,
projectId : executionWorkspace.projectId ,
workspaceId : executionWorkspace.workspaceId ,
repoUrl : executionWorkspace.repoUrl ,
repoRef : executionWorkspace.repoRef ,
branchName : executionWorkspace.branchName ,
worktreePath : executionWorkspace.worktreePath ,
2026-03-14 00:36:53 -07:00
agentHome : resolveDefaultAgentWorkspaceDir ( agent . id ) ,
2026-02-25 08:38:58 -06:00
} ;
2026-02-25 21:35:33 -06:00
context . paperclipWorkspaces = resolvedWorkspace . workspaceHints ;
2026-03-10 10:58:38 -05:00
const runtimeServiceIntents = ( ( ) = > {
const runtimeConfig = parseObject ( resolvedConfig . workspaceRuntime ) ;
return Array . isArray ( runtimeConfig . services )
? runtimeConfig . services . filter (
( value ) : value is Record < string , unknown > = > typeof value === "object" && value !== null ,
)
: [ ] ;
} ) ( ) ;
if ( runtimeServiceIntents . length > 0 ) {
context . paperclipRuntimeServiceIntents = runtimeServiceIntents ;
} else {
delete context . paperclipRuntimeServiceIntents ;
}
if ( executionWorkspace . projectId && ! readNonEmptyString ( context . projectId ) ) {
context . projectId = executionWorkspace . projectId ;
2026-02-25 08:38:58 -06:00
}
2026-03-05 09:48:11 -06:00
const runtimeSessionFallback = taskKey || resetTaskSession ? null : runtime . sessionId ;
2026-03-13 08:49:11 -05:00
let previousSessionDisplayId = truncateDisplayId (
2026-03-05 06:54:36 -06:00
taskSessionForRun ? . sessionDisplayId ? ?
2026-03-05 06:14:32 -06:00
( sessionCodec . getDisplayId ? sessionCodec . getDisplayId ( runtimeSessionParams ) : null ) ? ?
readNonEmptyString ( runtimeSessionParams ? . sessionId ) ? ?
2026-02-20 15:48:22 -06:00
runtimeSessionFallback ,
2026-02-19 14:02:17 -06:00
) ;
2026-03-13 08:49:11 -05:00
let runtimeSessionIdForAdapter =
readNonEmptyString ( runtimeSessionParams ? . sessionId ) ? ? runtimeSessionFallback ;
let runtimeSessionParamsForAdapter = runtimeSessionParams ;
const sessionCompaction = await evaluateSessionCompaction ( {
agent ,
sessionId : previousSessionDisplayId ? ? runtimeSessionIdForAdapter ,
issueId ,
} ) ;
if ( sessionCompaction . rotate ) {
context . paperclipSessionHandoffMarkdown = sessionCompaction . handoffMarkdown ;
context . paperclipSessionRotationReason = sessionCompaction . reason ;
context . paperclipPreviousSessionId = previousSessionDisplayId ? ? runtimeSessionIdForAdapter ;
runtimeSessionIdForAdapter = null ;
runtimeSessionParamsForAdapter = null ;
previousSessionDisplayId = null ;
if ( sessionCompaction . reason ) {
runtimeWorkspaceWarnings . push (
` Starting a fresh session because ${ sessionCompaction . reason } . ` ,
) ;
}
} else {
delete context . paperclipSessionHandoffMarkdown ;
delete context . paperclipSessionRotationReason ;
delete context . paperclipPreviousSessionId ;
}
2026-02-19 14:02:17 -06:00
const runtimeForAdapter = {
2026-03-13 08:49:11 -05:00
sessionId : runtimeSessionIdForAdapter ,
sessionParams : runtimeSessionParamsForAdapter ,
2026-02-19 14:02:17 -06:00
sessionDisplayId : previousSessionDisplayId ,
taskKey ,
} ;
2026-02-17 12:24:43 -06:00
let seq = 1 ;
let handle : RunLogHandle | null = null ;
let stdoutExcerpt = "" ;
let stderrExcerpt = "" ;
Add server routes for companies, approvals, costs, and dashboard
New routes: companies, approvals, costs, dashboard, authz. New
services: companies, approvals, costs, dashboard, heartbeat,
activity-log. Add auth middleware and structured error handling.
Expand existing agent and issue routes with richer CRUD operations.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 09:07:27 -06:00
try {
2026-02-20 12:50:34 -06:00
const startedAt = run . startedAt ? ? new Date ( ) ;
const runningWithSession = await db
. update ( heartbeatRuns )
. set ( {
startedAt ,
sessionIdBefore : runtimeForAdapter.sessionDisplayId ? ? runtimeForAdapter . sessionId ,
2026-03-10 10:58:38 -05:00
contextSnapshot : context ,
2026-02-20 12:50:34 -06:00
updatedAt : new Date ( ) ,
} )
. where ( eq ( heartbeatRuns . id , run . id ) )
. returning ( )
. then ( ( rows ) = > rows [ 0 ] ? ? null ) ;
if ( runningWithSession ) run = runningWithSession ;
2026-02-17 12:24:43 -06:00
const runningAgent = await db
. update ( agents )
. set ( { status : "running" , updatedAt : new Date ( ) } )
. where ( eq ( agents . id , agent . id ) )
. returning ( )
. then ( ( rows ) = > rows [ 0 ] ? ? null ) ;
if ( runningAgent ) {
publishLiveEvent ( {
companyId : runningAgent.companyId ,
type : "agent.status" ,
payload : {
agentId : runningAgent.id ,
status : runningAgent.status ,
outcome : "running" ,
} ,
} ) ;
}
2026-02-20 12:50:34 -06:00
const currentRun = run ;
2026-02-17 12:24:43 -06:00
await appendRunEvent ( currentRun , seq ++ , {
eventType : "lifecycle" ,
stream : "system" ,
level : "info" ,
message : "run started" ,
} ) ;
handle = await runLogStore . begin ( {
companyId : run.companyId ,
agentId : run.agentId ,
runId ,
} ) ;
await db
. update ( heartbeatRuns )
. set ( {
logStore : handle.store ,
logRef : handle.logRef ,
updatedAt : new Date ( ) ,
} )
. where ( eq ( heartbeatRuns . id , runId ) ) ;
const onLog = async ( stream : "stdout" | "stderr" , chunk : string ) = > {
2026-03-11 17:46:23 -05:00
const sanitizedChunk = redactCurrentUserText ( chunk ) ;
if ( stream === "stdout" ) stdoutExcerpt = appendExcerpt ( stdoutExcerpt , sanitizedChunk ) ;
if ( stream === "stderr" ) stderrExcerpt = appendExcerpt ( stderrExcerpt , sanitizedChunk ) ;
2026-03-11 13:29:40 -05:00
const ts = new Date ( ) . toISOString ( ) ;
2026-02-17 12:24:43 -06:00
if ( handle ) {
await runLogStore . append ( handle , {
stream ,
2026-03-11 17:46:23 -05:00
chunk : sanitizedChunk ,
2026-03-11 13:29:40 -05:00
ts ,
2026-02-17 12:24:43 -06:00
} ) ;
}
const payloadChunk =
2026-03-11 17:46:23 -05:00
sanitizedChunk . length > MAX_LIVE_LOG_CHUNK_BYTES
? sanitizedChunk . slice ( sanitizedChunk . length - MAX_LIVE_LOG_CHUNK_BYTES )
: sanitizedChunk ;
2026-02-17 12:24:43 -06:00
publishLiveEvent ( {
companyId : run.companyId ,
type : "heartbeat.run.log" ,
payload : {
runId : run.id ,
agentId : run.agentId ,
2026-03-11 13:29:40 -05:00
ts ,
2026-02-17 12:24:43 -06:00
stream ,
chunk : payloadChunk ,
2026-03-11 17:46:23 -05:00
truncated : payloadChunk.length !== sanitizedChunk . length ,
2026-02-17 12:24:43 -06:00
} ,
} ) ;
} ;
2026-03-05 06:14:32 -06:00
for ( const warning of runtimeWorkspaceWarnings ) {
await onLog ( "stderr" , ` [paperclip] ${ warning } \ n ` ) ;
}
2026-03-10 10:58:38 -05:00
const adapterEnv = Object . fromEntries (
Object . entries ( parseObject ( resolvedConfig . env ) ) . filter (
( entry ) : entry is [ string , string ] = > typeof entry [ 0 ] === "string" && typeof entry [ 1 ] === "string" ,
) ,
2026-02-19 15:43:52 -06:00
) ;
2026-03-10 10:58:38 -05:00
const runtimeServices = await ensureRuntimeServicesForRun ( {
db ,
runId : run.id ,
agent : {
id : agent.id ,
name : agent.name ,
companyId : agent.companyId ,
} ,
issue : issueRef ,
workspace : executionWorkspace ,
config : resolvedConfig ,
adapterEnv ,
onLog ,
} ) ;
if ( runtimeServices . length > 0 ) {
context . paperclipRuntimeServices = runtimeServices ;
context . paperclipRuntimePrimaryUrl =
runtimeServices . find ( ( service ) = > readNonEmptyString ( service . url ) ) ? . url ? ? null ;
await db
. update ( heartbeatRuns )
. set ( {
contextSnapshot : context ,
updatedAt : new Date ( ) ,
} )
. where ( eq ( heartbeatRuns . id , run . id ) ) ;
}
if ( issueId && ( executionWorkspace . created || runtimeServices . some ( ( service ) = > ! service . reused ) ) ) {
try {
await issuesSvc . addComment (
issueId ,
buildWorkspaceReadyComment ( {
workspace : executionWorkspace ,
runtimeServices ,
} ) ,
{ agentId : agent.id } ,
) ;
} catch ( err ) {
await onLog (
"stderr" ,
` [paperclip] Failed to post workspace-ready comment: ${ err instanceof Error ? err.message : String ( err ) } \ n ` ,
) ;
}
}
2026-02-18 13:02:17 -06:00
const onAdapterMeta = async ( meta : AdapterInvocationMeta ) = > {
2026-03-07 16:04:09 -08:00
if ( meta . env && secretKeys . size > 0 ) {
for ( const key of secretKeys ) {
if ( key in meta . env ) meta . env [ key ] = "***REDACTED***" ;
}
}
2026-02-18 13:02:17 -06:00
await appendRunEvent ( currentRun , seq ++ , {
eventType : "adapter.invoke" ,
stream : "system" ,
level : "info" ,
message : "adapter invocation" ,
2026-02-18 13:53:03 -06:00
payload : meta as unknown as Record < string , unknown > ,
2026-02-18 13:02:17 -06:00
} ) ;
} ;
Add server routes for companies, approvals, costs, and dashboard
New routes: companies, approvals, costs, dashboard, authz. New
services: companies, approvals, costs, dashboard, heartbeat,
activity-log. Add auth middleware and structured error handling.
Expand existing agent and issue routes with richer CRUD operations.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 09:07:27 -06:00
2026-02-18 13:53:03 -06:00
const adapter = getServerAdapter ( agent . adapterType ) ;
2026-02-18 16:46:45 -06:00
const authToken = adapter . supportsLocalAgentJwt
? createLocalAgentJwt ( agent . id , agent . companyId , agent . adapterType , run . id )
: null ;
if ( adapter . supportsLocalAgentJwt && ! authToken ) {
logger . warn (
{
companyId : agent.companyId ,
agentId : agent.id ,
runId : run.id ,
adapterType : agent.adapterType ,
} ,
"local agent jwt secret missing or invalid; running without injected PAPERCLIP_API_KEY" ,
) ;
}
2026-02-18 13:53:03 -06:00
const adapterResult = await adapter . execute ( {
runId : run.id ,
agent ,
2026-02-19 14:02:17 -06:00
runtime : runtimeForAdapter ,
2026-03-15 07:05:01 -05:00
config : runtimeConfig ,
2026-02-18 13:53:03 -06:00
context ,
onLog ,
onMeta : onAdapterMeta ,
2026-02-18 16:46:45 -06:00
authToken : authToken ? ? undefined ,
2026-02-18 13:53:03 -06:00
} ) ;
2026-03-10 10:58:38 -05:00
const adapterManagedRuntimeServices = adapterResult . runtimeServices
? await persistAdapterManagedRuntimeServices ( {
db ,
adapterType : agent.adapterType ,
runId : run.id ,
agent : {
id : agent.id ,
name : agent.name ,
companyId : agent.companyId ,
} ,
issue : issueRef ,
workspace : executionWorkspace ,
reports : adapterResult.runtimeServices ,
} )
: [ ] ;
if ( adapterManagedRuntimeServices . length > 0 ) {
const combinedRuntimeServices = [
. . . runtimeServices ,
. . . adapterManagedRuntimeServices ,
] ;
context . paperclipRuntimeServices = combinedRuntimeServices ;
context . paperclipRuntimePrimaryUrl =
combinedRuntimeServices . find ( ( service ) = > readNonEmptyString ( service . url ) ) ? . url ? ? null ;
await db
. update ( heartbeatRuns )
. set ( {
contextSnapshot : context ,
updatedAt : new Date ( ) ,
} )
. where ( eq ( heartbeatRuns . id , run . id ) ) ;
if ( issueId ) {
try {
await issuesSvc . addComment (
issueId ,
buildWorkspaceReadyComment ( {
workspace : executionWorkspace ,
runtimeServices : adapterManagedRuntimeServices ,
} ) ,
{ agentId : agent.id } ,
) ;
} catch ( err ) {
await onLog (
"stderr" ,
` [paperclip] Failed to post adapter-managed runtime comment: ${ err instanceof Error ? err.message : String ( err ) } \ n ` ,
) ;
}
}
}
2026-02-19 14:02:17 -06:00
const nextSessionState = resolveNextSessionState ( {
codec : sessionCodec ,
adapterResult ,
previousParams : previousSessionParams ,
previousDisplayId : runtimeForAdapter.sessionDisplayId ,
previousLegacySessionId : runtimeForAdapter.sessionId ,
} ) ;
2026-03-13 08:49:11 -05:00
const rawUsage = normalizeUsageTotals ( adapterResult . usage ) ;
const sessionUsageResolution = await resolveNormalizedUsageForSession ( {
agentId : agent.id ,
runId : run.id ,
sessionId : nextSessionState.displayId ? ? nextSessionState . legacySessionId ,
rawUsage ,
} ) ;
const normalizedUsage = sessionUsageResolution . normalizedUsage ;
Add server routes for companies, approvals, costs, and dashboard
New routes: companies, approvals, costs, dashboard, authz. New
services: companies, approvals, costs, dashboard, heartbeat,
activity-log. Add auth middleware and structured error handling.
Expand existing agent and issue routes with richer CRUD operations.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 09:07:27 -06:00
2026-02-17 12:24:43 -06:00
let outcome : "succeeded" | "failed" | "cancelled" | "timed_out" ;
const latestRun = await getRun ( run . id ) ;
if ( latestRun ? . status === "cancelled" ) {
outcome = "cancelled" ;
} else if ( adapterResult . timedOut ) {
outcome = "timed_out" ;
} else if ( ( adapterResult . exitCode ? ? 0 ) === 0 && ! adapterResult . errorMessage ) {
outcome = "succeeded" ;
} else {
outcome = "failed" ;
}
Add server routes for companies, approvals, costs, and dashboard
New routes: companies, approvals, costs, dashboard, authz. New
services: companies, approvals, costs, dashboard, heartbeat,
activity-log. Add auth middleware and structured error handling.
Expand existing agent and issue routes with richer CRUD operations.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 09:07:27 -06:00
2026-02-17 12:24:43 -06:00
let logSummary : { bytes : number ; sha256? : string ; compressed : boolean } | null = null ;
if ( handle ) {
logSummary = await runLogStore . finalize ( handle ) ;
Add server routes for companies, approvals, costs, and dashboard
New routes: companies, approvals, costs, dashboard, authz. New
services: companies, approvals, costs, dashboard, heartbeat,
activity-log. Add auth middleware and structured error handling.
Expand existing agent and issue routes with richer CRUD operations.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 09:07:27 -06:00
}
2026-02-17 12:24:43 -06:00
const status =
outcome === "succeeded"
? "succeeded"
: outcome === "cancelled"
? "cancelled"
: outcome === "timed_out"
? "timed_out"
: "failed" ;
2026-02-18 13:02:17 -06:00
const usageJson =
2026-03-13 08:49:11 -05:00
normalizedUsage || adapterResult . costUsd != null
2026-02-18 13:02:17 -06:00
? ( {
2026-03-13 08:49:11 -05:00
. . . ( normalizedUsage ? ? { } ) ,
. . . ( rawUsage ? {
rawInputTokens : rawUsage.inputTokens ,
rawCachedInputTokens : rawUsage.cachedInputTokens ,
rawOutputTokens : rawUsage.outputTokens ,
} : { } ) ,
. . . ( sessionUsageResolution . derivedFromSessionTotals ? { usageSource : "session_delta" } : { } ) ,
. . . ( ( nextSessionState . displayId ? ? nextSessionState . legacySessionId )
? { persistedSessionId : nextSessionState.displayId ? ? nextSessionState . legacySessionId }
: { } ) ,
sessionReused : runtimeForAdapter.sessionId != null || runtimeForAdapter . sessionDisplayId != null ,
taskSessionReused : taskSessionForRun != null ,
freshSession : runtimeForAdapter.sessionId == null && runtimeForAdapter . sessionDisplayId == null ,
sessionRotated : sessionCompaction.rotate ,
sessionRotationReason : sessionCompaction.reason ,
2026-02-18 13:02:17 -06:00
. . . ( adapterResult . costUsd != null ? { costUsd : adapterResult.costUsd } : { } ) ,
2026-02-25 21:35:33 -06:00
. . . ( adapterResult . billingType ? { billingType : adapterResult.billingType } : { } ) ,
2026-02-18 13:02:17 -06:00
} as Record < string , unknown > )
: null ;
2026-02-17 12:24:43 -06:00
await setRunStatus ( run . id , status , {
finishedAt : new Date ( ) ,
error :
outcome === "succeeded"
? null
2026-03-11 17:46:23 -05:00
: redactCurrentUserText (
adapterResult . errorMessage ? ? ( outcome === "timed_out" ? "Timed out" : "Adapter failed" ) ,
) ,
2026-02-17 12:24:43 -06:00
errorCode :
outcome === "timed_out"
? "timeout"
: outcome === "cancelled"
? "cancelled"
: outcome === "failed"
2026-02-23 14:40:32 -06:00
? ( adapterResult . errorCode ? ? "adapter_failed" )
2026-02-17 12:24:43 -06:00
: null ,
exitCode : adapterResult.exitCode ,
signal : adapterResult.signal ,
2026-02-18 13:02:17 -06:00
usageJson ,
2026-02-17 12:24:43 -06:00
resultJson : adapterResult.resultJson ? ? null ,
2026-02-19 14:02:17 -06:00
sessionIdAfter : nextSessionState.displayId ? ? nextSessionState . legacySessionId ,
2026-02-17 12:24:43 -06:00
stdoutExcerpt ,
stderrExcerpt ,
logBytes : logSummary?.bytes ,
logSha256 : logSummary?.sha256 ,
logCompressed : logSummary?.compressed ? ? false ,
} ) ;
await setWakeupStatus ( run . wakeupRequestId , outcome === "succeeded" ? "completed" : status , {
finishedAt : new Date ( ) ,
error : adapterResult.errorMessage ? ? null ,
} ) ;
const finalizedRun = await getRun ( run . id ) ;
if ( finalizedRun ) {
await appendRunEvent ( finalizedRun , seq ++ , {
eventType : "lifecycle" ,
stream : "system" ,
level : outcome === "succeeded" ? "info" : "error" ,
message : ` run ${ outcome } ` ,
payload : {
status ,
exitCode : adapterResult.exitCode ,
} ,
} ) ;
2026-02-20 15:48:22 -06:00
await releaseIssueExecutionAndPromote ( finalizedRun ) ;
2026-02-17 12:24:43 -06:00
}
if ( finalizedRun ) {
2026-02-19 14:02:17 -06:00
await updateRuntimeState ( agent , finalizedRun , adapterResult , {
legacySessionId : nextSessionState.legacySessionId ,
2026-03-13 08:49:11 -05:00
} , normalizedUsage ) ;
2026-02-19 14:02:17 -06:00
if ( taskKey ) {
if ( adapterResult . clearSession || ( ! nextSessionState . params && ! nextSessionState . displayId ) ) {
await clearTaskSessions ( agent . companyId , agent . id , {
taskKey ,
adapterType : agent.adapterType ,
} ) ;
} else {
await upsertTaskSession ( {
companyId : agent.companyId ,
agentId : agent.id ,
adapterType : agent.adapterType ,
taskKey ,
sessionParamsJson : nextSessionState.params ,
sessionDisplayId : nextSessionState.displayId ,
lastRunId : finalizedRun.id ,
lastError : outcome === "succeeded" ? null : ( adapterResult . errorMessage ? ? "run_failed" ) ,
} ) ;
}
}
2026-02-17 12:24:43 -06:00
}
await finalizeAgentStatus ( agent . id , outcome ) ;
Add server routes for companies, approvals, costs, and dashboard
New routes: companies, approvals, costs, dashboard, authz. New
services: companies, approvals, costs, dashboard, heartbeat,
activity-log. Add auth middleware and structured error handling.
Expand existing agent and issue routes with richer CRUD operations.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 09:07:27 -06:00
} catch ( err ) {
2026-03-11 17:46:23 -05:00
const message = redactCurrentUserText ( err instanceof Error ? err . message : "Unknown adapter failure" ) ;
2026-02-17 12:24:43 -06:00
logger . error ( { err , runId } , "heartbeat execution failed" ) ;
let logSummary : { bytes : number ; sha256? : string ; compressed : boolean } | null = null ;
if ( handle ) {
try {
logSummary = await runLogStore . finalize ( handle ) ;
} catch ( finalizeErr ) {
logger . warn ( { err : finalizeErr , runId } , "failed to finalize run log after error" ) ;
}
}
const failedRun = await setRunStatus ( run . id , "failed" , {
Add server routes for companies, approvals, costs, and dashboard
New routes: companies, approvals, costs, dashboard, authz. New
services: companies, approvals, costs, dashboard, heartbeat,
activity-log. Add auth middleware and structured error handling.
Expand existing agent and issue routes with richer CRUD operations.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 09:07:27 -06:00
error : message ,
2026-02-17 12:24:43 -06:00
errorCode : "adapter_failed" ,
finishedAt : new Date ( ) ,
stdoutExcerpt ,
stderrExcerpt ,
logBytes : logSummary?.bytes ,
logSha256 : logSummary?.sha256 ,
logCompressed : logSummary?.compressed ? ? false ,
} ) ;
await setWakeupStatus ( run . wakeupRequestId , "failed" , {
Add server routes for companies, approvals, costs, and dashboard
New routes: companies, approvals, costs, dashboard, authz. New
services: companies, approvals, costs, dashboard, heartbeat,
activity-log. Add auth middleware and structured error handling.
Expand existing agent and issue routes with richer CRUD operations.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 09:07:27 -06:00
finishedAt : new Date ( ) ,
2026-02-17 12:24:43 -06:00
error : message ,
Add server routes for companies, approvals, costs, and dashboard
New routes: companies, approvals, costs, dashboard, authz. New
services: companies, approvals, costs, dashboard, heartbeat,
activity-log. Add auth middleware and structured error handling.
Expand existing agent and issue routes with richer CRUD operations.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 09:07:27 -06:00
} ) ;
2026-02-17 12:24:43 -06:00
if ( failedRun ) {
await appendRunEvent ( failedRun , seq ++ , {
eventType : "error" ,
stream : "system" ,
level : "error" ,
message ,
} ) ;
2026-02-20 15:48:22 -06:00
await releaseIssueExecutionAndPromote ( failedRun ) ;
2026-02-17 12:24:43 -06:00
await updateRuntimeState ( agent , failedRun , {
exitCode : null ,
signal : null ,
timedOut : false ,
errorMessage : message ,
2026-02-19 14:02:17 -06:00
} , {
legacySessionId : runtimeForAdapter.sessionId ,
2026-02-17 12:24:43 -06:00
} ) ;
2026-02-19 14:02:17 -06:00
if ( taskKey && ( previousSessionParams || previousSessionDisplayId || taskSession ) ) {
await upsertTaskSession ( {
companyId : agent.companyId ,
agentId : agent.id ,
adapterType : agent.adapterType ,
taskKey ,
sessionParamsJson : previousSessionParams ,
sessionDisplayId : previousSessionDisplayId ,
lastRunId : failedRun.id ,
lastError : message ,
} ) ;
}
2026-02-17 12:24:43 -06:00
}
await finalizeAgentStatus ( agent . id , "failed" ) ;
2026-03-13 06:56:31 -05:00
}
} catch ( outerErr ) {
2026-03-07 12:37:15 -05:00
// Setup code before adapter.execute threw (e.g. ensureRuntimeState, resolveWorkspaceForRun).
// The inner catch did not fire, so we must record the failure here.
const message = outerErr instanceof Error ? outerErr . message : "Unknown setup failure" ;
logger . error ( { err : outerErr , runId } , "heartbeat execution setup failed" ) ;
await setRunStatus ( runId , "failed" , {
error : message ,
errorCode : "adapter_failed" ,
finishedAt : new Date ( ) ,
} ) . catch ( ( ) = > undefined ) ;
await setWakeupStatus ( run . wakeupRequestId , "failed" , {
finishedAt : new Date ( ) ,
error : message ,
} ) . catch ( ( ) = > undefined ) ;
const failedRun = await getRun ( runId ) . catch ( ( ) = > null ) ;
if ( failedRun ) {
// Emit a run-log event so the failure is visible in the run timeline,
// consistent with what the inner catch block does for adapter failures.
await appendRunEvent ( failedRun , 1 , {
eventType : "error" ,
stream : "system" ,
level : "error" ,
message ,
} ) . catch ( ( ) = > undefined ) ;
await releaseIssueExecutionAndPromote ( failedRun ) . catch ( ( ) = > undefined ) ;
}
// Ensure the agent is not left stuck in "running" if the inner catch handler's
// DB calls threw (e.g. a transient DB error in finalizeAgentStatus).
await finalizeAgentStatus ( run . agentId , "failed" ) . catch ( ( ) = > undefined ) ;
} finally {
await releaseRuntimeServicesForRun ( run . id ) . catch ( ( ) = > undefined ) ;
2026-03-13 06:56:31 -05:00
activeRunExecutions . delete ( run . id ) ;
2026-03-07 12:37:15 -05:00
await startNextQueuedRunForAgent ( run . agentId ) ;
2026-03-13 06:56:31 -05:00
}
Add server routes for companies, approvals, costs, and dashboard
New routes: companies, approvals, costs, dashboard, authz. New
services: companies, approvals, costs, dashboard, heartbeat,
activity-log. Add auth middleware and structured error handling.
Expand existing agent and issue routes with richer CRUD operations.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 09:07:27 -06:00
}
2026-02-20 15:48:22 -06:00
async function releaseIssueExecutionAndPromote ( run : typeof heartbeatRuns . $inferSelect ) {
const promotedRun = await db . transaction ( async ( tx ) = > {
await tx . execute (
sql ` select id from issues where company_id = ${ run . companyId } and execution_run_id = ${ run . id } for update ` ,
) ;
const issue = await tx
. select ( {
id : issues.id ,
companyId : issues.companyId ,
} )
. from ( issues )
. where ( and ( eq ( issues . companyId , run . companyId ) , eq ( issues . executionRunId , run . id ) ) )
. then ( ( rows ) = > rows [ 0 ] ? ? null ) ;
if ( ! issue ) return ;
await tx
. update ( issues )
. set ( {
executionRunId : null ,
executionAgentNameKey : null ,
executionLockedAt : null ,
updatedAt : new Date ( ) ,
} )
. where ( eq ( issues . id , issue . id ) ) ;
while ( true ) {
const deferred = await tx
. select ( )
. from ( agentWakeupRequests )
. where (
and (
eq ( agentWakeupRequests . companyId , issue . companyId ) ,
eq ( agentWakeupRequests . status , "deferred_issue_execution" ) ,
sql ` ${ agentWakeupRequests . payload } ->> 'issueId' = ${ issue . id } ` ,
) ,
)
. orderBy ( asc ( agentWakeupRequests . requestedAt ) )
. limit ( 1 )
. then ( ( rows ) = > rows [ 0 ] ? ? null ) ;
if ( ! deferred ) return null ;
const deferredAgent = await tx
. select ( )
. from ( agents )
. where ( eq ( agents . id , deferred . agentId ) )
. then ( ( rows ) = > rows [ 0 ] ? ? null ) ;
if (
! deferredAgent ||
deferredAgent . companyId !== issue . companyId ||
deferredAgent . status === "paused" ||
deferredAgent . status === "terminated" ||
deferredAgent . status === "pending_approval"
) {
await tx
. update ( agentWakeupRequests )
. set ( {
status : "failed" ,
finishedAt : new Date ( ) ,
error : "Deferred wake could not be promoted: agent is not invokable" ,
updatedAt : new Date ( ) ,
} )
. where ( eq ( agentWakeupRequests . id , deferred . id ) ) ;
continue ;
}
const deferredPayload = parseObject ( deferred . payload ) ;
const deferredContextSeed = parseObject ( deferredPayload [ DEFERRED_WAKE_CONTEXT_KEY ] ) ;
const promotedContextSeed : Record < string , unknown > = { . . . deferredContextSeed } ;
const promotedReason = readNonEmptyString ( deferred . reason ) ? ? "issue_execution_promoted" ;
const promotedSource =
( readNonEmptyString ( deferred . source ) as WakeupOptions [ "source" ] ) ? ? "automation" ;
const promotedTriggerDetail =
( readNonEmptyString ( deferred . triggerDetail ) as WakeupOptions [ "triggerDetail" ] ) ? ? null ;
const promotedPayload = deferredPayload ;
delete promotedPayload [ DEFERRED_WAKE_CONTEXT_KEY ] ;
const {
contextSnapshot : promotedContextSnapshot ,
taskKey : promotedTaskKey ,
} = enrichWakeContextSnapshot ( {
contextSnapshot : promotedContextSeed ,
reason : promotedReason ,
source : promotedSource ,
triggerDetail : promotedTriggerDetail ,
payload : promotedPayload ,
} ) ;
const sessionBefore = await resolveSessionBeforeForWakeup ( deferredAgent , promotedTaskKey ) ;
const now = new Date ( ) ;
const newRun = await tx
. insert ( heartbeatRuns )
. values ( {
companyId : deferredAgent.companyId ,
agentId : deferredAgent.id ,
invocationSource : promotedSource ,
triggerDetail : promotedTriggerDetail ,
status : "queued" ,
wakeupRequestId : deferred.id ,
contextSnapshot : promotedContextSnapshot ,
sessionIdBefore : sessionBefore ,
} )
. returning ( )
. then ( ( rows ) = > rows [ 0 ] ) ;
await tx
. update ( agentWakeupRequests )
. set ( {
status : "queued" ,
reason : "issue_execution_promoted" ,
runId : newRun.id ,
claimedAt : null ,
finishedAt : null ,
error : null ,
updatedAt : now ,
} )
. where ( eq ( agentWakeupRequests . id , deferred . id ) ) ;
await tx
. update ( issues )
. set ( {
executionRunId : newRun.id ,
executionAgentNameKey : normalizeAgentNameKey ( deferredAgent . name ) ,
executionLockedAt : now ,
updatedAt : now ,
} )
. where ( eq ( issues . id , issue . id ) ) ;
return newRun ;
}
} ) ;
if ( ! promotedRun ) return ;
publishLiveEvent ( {
companyId : promotedRun.companyId ,
type : "heartbeat.run.queued" ,
payload : {
runId : promotedRun.id ,
agentId : promotedRun.agentId ,
invocationSource : promotedRun.invocationSource ,
triggerDetail : promotedRun.triggerDetail ,
wakeupRequestId : promotedRun.wakeupRequestId ,
} ,
} ) ;
await startNextQueuedRunForAgent ( promotedRun . agentId ) ;
}
2026-02-17 12:24:43 -06:00
async function enqueueWakeup ( agentId : string , opts : WakeupOptions = { } ) {
const source = opts . source ? ? "on_demand" ;
const triggerDetail = opts . triggerDetail ? ? null ;
2026-02-19 09:09:40 -06:00
const contextSnapshot : Record < string , unknown > = { . . . ( opts . contextSnapshot ? ? { } ) } ;
const reason = opts . reason ? ? null ;
const payload = opts . payload ? ? null ;
2026-02-20 15:48:22 -06:00
const {
contextSnapshot : enrichedContextSnapshot ,
issueIdFromPayload ,
taskKey ,
wakeCommentId ,
} = enrichWakeContextSnapshot ( {
contextSnapshot ,
reason ,
source ,
triggerDetail ,
payload ,
} ) ;
const issueId = readNonEmptyString ( enrichedContextSnapshot . issueId ) ? ? issueIdFromPayload ;
2026-02-17 12:24:43 -06:00
const agent = await getAgent ( agentId ) ;
if ( ! agent ) throw notFound ( "Agent not found" ) ;
Implement agent hiring, approval workflows, config revisions, LLM reflection, and sidebar badges
Agent management: hire endpoint with permission gates and pending_approval status,
config revision tracking with rollback, agent duplicate route, permission CRUD.
Block pending_approval agents from auth, heartbeat, and assignments.
Approvals: revision request/resubmit flow, approval comments CRUD, issue-approval
linking, auto-wake agents on approval decisions with context snapshot.
Costs: per-agent breakdown, period filtering (month/week/day/all), cost by agent
list endpoint.
Adapters: agentConfigurationDoc on all adapters, /llms/agent-configuration.txt
reflection routes. Inject PAPERCLIP_APPROVAL_ID, PAPERCLIP_APPROVAL_STATUS,
PAPERCLIP_LINKED_ISSUE_IDS into adapter environments.
Sidebar badges endpoint for pending approval/inbox counts. Dashboard and company
settings extensions.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-19 13:02:41 -06:00
if (
agent . status === "paused" ||
agent . status === "terminated" ||
agent . status === "pending_approval"
) {
2026-02-17 12:24:43 -06:00
throw conflict ( "Agent is not invokable in its current state" , { status : agent.status } ) ;
}
const policy = parseHeartbeatPolicy ( agent ) ;
const writeSkippedRequest = async ( reason : string ) = > {
await db . insert ( agentWakeupRequests ) . values ( {
companyId : agent.companyId ,
agentId ,
source ,
triggerDetail ,
reason ,
2026-02-19 09:09:40 -06:00
payload ,
2026-02-17 12:24:43 -06:00
status : "skipped" ,
requestedByActorType : opts.requestedByActorType ? ? null ,
requestedByActorId : opts.requestedByActorId ? ? null ,
idempotencyKey : opts.idempotencyKey ? ? null ,
finishedAt : new Date ( ) ,
} ) ;
} ;
if ( source === "timer" && ! policy . enabled ) {
await writeSkippedRequest ( "heartbeat.disabled" ) ;
return null ;
}
2026-02-18 16:46:45 -06:00
if ( source !== "timer" && ! policy . wakeOnDemand ) {
await writeSkippedRequest ( "heartbeat.wakeOnDemand.disabled" ) ;
2026-02-17 12:24:43 -06:00
return null ;
}
2026-02-26 16:33:39 -06:00
const bypassIssueExecutionLock =
reason === "issue_comment_mentioned" ||
readNonEmptyString ( enrichedContextSnapshot . wakeReason ) === "issue_comment_mentioned" ;
if ( issueId && ! bypassIssueExecutionLock ) {
2026-02-20 15:48:22 -06:00
const agentNameKey = normalizeAgentNameKey ( agent . name ) ;
const sessionBefore = await resolveSessionBeforeForWakeup ( agent , taskKey ) ;
const outcome = await db . transaction ( async ( tx ) = > {
await tx . execute (
sql ` select id from issues where id = ${ issueId } and company_id = ${ agent . companyId } for update ` ,
) ;
const issue = await tx
. select ( {
id : issues.id ,
companyId : issues.companyId ,
executionRunId : issues.executionRunId ,
executionAgentNameKey : issues.executionAgentNameKey ,
} )
. from ( issues )
. where ( and ( eq ( issues . id , issueId ) , eq ( issues . companyId , agent . companyId ) ) )
. then ( ( rows ) = > rows [ 0 ] ? ? null ) ;
if ( ! issue ) {
await tx . insert ( agentWakeupRequests ) . values ( {
companyId : agent.companyId ,
agentId ,
source ,
triggerDetail ,
reason : "issue_execution_issue_not_found" ,
payload ,
status : "skipped" ,
requestedByActorType : opts.requestedByActorType ? ? null ,
requestedByActorId : opts.requestedByActorId ? ? null ,
idempotencyKey : opts.idempotencyKey ? ? null ,
finishedAt : new Date ( ) ,
} ) ;
return { kind : "skipped" as const } ;
}
let activeExecutionRun = issue . executionRunId
? await tx
. select ( )
. from ( heartbeatRuns )
. where ( eq ( heartbeatRuns . id , issue . executionRunId ) )
. then ( ( rows ) = > rows [ 0 ] ? ? null )
: null ;
if ( activeExecutionRun && activeExecutionRun . status !== "queued" && activeExecutionRun . status !== "running" ) {
activeExecutionRun = null ;
}
if ( ! activeExecutionRun && issue . executionRunId ) {
await tx
. update ( issues )
. set ( {
executionRunId : null ,
executionAgentNameKey : null ,
executionLockedAt : null ,
updatedAt : new Date ( ) ,
} )
. where ( eq ( issues . id , issue . id ) ) ;
}
if ( ! activeExecutionRun ) {
const legacyRun = await tx
. select ( )
. from ( heartbeatRuns )
. where (
and (
eq ( heartbeatRuns . companyId , issue . companyId ) ,
inArray ( heartbeatRuns . status , [ "queued" , "running" ] ) ,
sql ` ${ heartbeatRuns . contextSnapshot } ->> 'issueId' = ${ issue . id } ` ,
) ,
)
. orderBy (
sql ` case when ${ heartbeatRuns . status } = 'running' then 0 else 1 end ` ,
asc ( heartbeatRuns . createdAt ) ,
)
. limit ( 1 )
. then ( ( rows ) = > rows [ 0 ] ? ? null ) ;
if ( legacyRun ) {
activeExecutionRun = legacyRun ;
const legacyAgent = await tx
. select ( { name : agents.name } )
. from ( agents )
. where ( eq ( agents . id , legacyRun . agentId ) )
. then ( ( rows ) = > rows [ 0 ] ? ? null ) ;
await tx
. update ( issues )
. set ( {
executionRunId : legacyRun.id ,
executionAgentNameKey : normalizeAgentNameKey ( legacyAgent ? . name ) ,
executionLockedAt : new Date ( ) ,
updatedAt : new Date ( ) ,
} )
. where ( eq ( issues . id , issue . id ) ) ;
}
}
if ( activeExecutionRun ) {
const executionAgent = await tx
. select ( { name : agents.name } )
. from ( agents )
. where ( eq ( agents . id , activeExecutionRun . agentId ) )
. then ( ( rows ) = > rows [ 0 ] ? ? null ) ;
const executionAgentNameKey =
normalizeAgentNameKey ( issue . executionAgentNameKey ) ? ?
normalizeAgentNameKey ( executionAgent ? . name ) ;
2026-03-02 16:43:59 -06:00
const isSameExecutionAgent =
Boolean ( executionAgentNameKey ) && executionAgentNameKey === agentNameKey ;
const shouldQueueFollowupForCommentWake =
Boolean ( wakeCommentId ) &&
activeExecutionRun . status === "running" &&
isSameExecutionAgent ;
if ( isSameExecutionAgent && ! shouldQueueFollowupForCommentWake ) {
2026-02-20 15:48:22 -06:00
const mergedContextSnapshot = mergeCoalescedContextSnapshot (
activeExecutionRun . contextSnapshot ,
enrichedContextSnapshot ,
) ;
const mergedRun = await tx
. update ( heartbeatRuns )
. set ( {
contextSnapshot : mergedContextSnapshot ,
updatedAt : new Date ( ) ,
} )
. where ( eq ( heartbeatRuns . id , activeExecutionRun . id ) )
. returning ( )
. then ( ( rows ) = > rows [ 0 ] ? ? activeExecutionRun ) ;
await tx . insert ( agentWakeupRequests ) . values ( {
companyId : agent.companyId ,
agentId ,
source ,
triggerDetail ,
reason : "issue_execution_same_name" ,
payload ,
status : "coalesced" ,
coalescedCount : 1 ,
requestedByActorType : opts.requestedByActorType ? ? null ,
requestedByActorId : opts.requestedByActorId ? ? null ,
idempotencyKey : opts.idempotencyKey ? ? null ,
runId : mergedRun.id ,
finishedAt : new Date ( ) ,
} ) ;
return { kind : "coalesced" as const , run : mergedRun } ;
}
const deferredPayload = {
. . . ( payload ? ? { } ) ,
issueId ,
[ DEFERRED_WAKE_CONTEXT_KEY ] : enrichedContextSnapshot ,
} ;
2026-03-02 16:43:59 -06:00
const existingDeferred = await tx
. select ( )
. from ( agentWakeupRequests )
. where (
and (
eq ( agentWakeupRequests . companyId , agent . companyId ) ,
eq ( agentWakeupRequests . agentId , agentId ) ,
eq ( agentWakeupRequests . status , "deferred_issue_execution" ) ,
sql ` ${ agentWakeupRequests . payload } ->> 'issueId' = ${ issue . id } ` ,
) ,
)
. orderBy ( asc ( agentWakeupRequests . requestedAt ) )
. limit ( 1 )
. then ( ( rows ) = > rows [ 0 ] ? ? null ) ;
if ( existingDeferred ) {
const existingDeferredPayload = parseObject ( existingDeferred . payload ) ;
const existingDeferredContext = parseObject ( existingDeferredPayload [ DEFERRED_WAKE_CONTEXT_KEY ] ) ;
const mergedDeferredContext = mergeCoalescedContextSnapshot (
existingDeferredContext ,
enrichedContextSnapshot ,
) ;
const mergedDeferredPayload = {
. . . existingDeferredPayload ,
. . . ( payload ? ? { } ) ,
issueId ,
[ DEFERRED_WAKE_CONTEXT_KEY ] : mergedDeferredContext ,
} ;
await tx
. update ( agentWakeupRequests )
. set ( {
payload : mergedDeferredPayload ,
coalescedCount : ( existingDeferred . coalescedCount ? ? 0 ) + 1 ,
updatedAt : new Date ( ) ,
} )
. where ( eq ( agentWakeupRequests . id , existingDeferred . id ) ) ;
return { kind : "deferred" as const } ;
}
2026-02-20 15:48:22 -06:00
await tx . insert ( agentWakeupRequests ) . values ( {
companyId : agent.companyId ,
agentId ,
source ,
triggerDetail ,
reason : "issue_execution_deferred" ,
payload : deferredPayload ,
status : "deferred_issue_execution" ,
requestedByActorType : opts.requestedByActorType ? ? null ,
requestedByActorId : opts.requestedByActorId ? ? null ,
idempotencyKey : opts.idempotencyKey ? ? null ,
} ) ;
return { kind : "deferred" as const } ;
}
const wakeupRequest = await tx
. insert ( agentWakeupRequests )
. values ( {
companyId : agent.companyId ,
agentId ,
source ,
triggerDetail ,
reason ,
payload ,
status : "queued" ,
requestedByActorType : opts.requestedByActorType ? ? null ,
requestedByActorId : opts.requestedByActorId ? ? null ,
idempotencyKey : opts.idempotencyKey ? ? null ,
} )
. returning ( )
. then ( ( rows ) = > rows [ 0 ] ) ;
const newRun = await tx
. insert ( heartbeatRuns )
. values ( {
companyId : agent.companyId ,
agentId ,
invocationSource : source ,
triggerDetail ,
status : "queued" ,
wakeupRequestId : wakeupRequest.id ,
contextSnapshot : enrichedContextSnapshot ,
sessionIdBefore : sessionBefore ,
} )
. returning ( )
. then ( ( rows ) = > rows [ 0 ] ) ;
await tx
. update ( agentWakeupRequests )
. set ( {
runId : newRun.id ,
updatedAt : new Date ( ) ,
} )
. where ( eq ( agentWakeupRequests . id , wakeupRequest . id ) ) ;
await tx
. update ( issues )
. set ( {
executionRunId : newRun.id ,
executionAgentNameKey : agentNameKey ,
executionLockedAt : new Date ( ) ,
updatedAt : new Date ( ) ,
} )
. where ( eq ( issues . id , issue . id ) ) ;
return { kind : "queued" as const , run : newRun } ;
} ) ;
if ( outcome . kind === "deferred" || outcome . kind === "skipped" ) return null ;
if ( outcome . kind === "coalesced" ) return outcome . run ;
const newRun = outcome . run ;
publishLiveEvent ( {
companyId : newRun.companyId ,
type : "heartbeat.run.queued" ,
payload : {
runId : newRun.id ,
agentId : newRun.agentId ,
invocationSource : newRun.invocationSource ,
triggerDetail : newRun.triggerDetail ,
wakeupRequestId : newRun.wakeupRequestId ,
} ,
} ) ;
await startNextQueuedRunForAgent ( agent . id ) ;
return newRun ;
}
2026-02-19 14:02:17 -06:00
const activeRuns = await db
2026-02-17 12:24:43 -06:00
. select ( )
. from ( heartbeatRuns )
. where ( and ( eq ( heartbeatRuns . agentId , agentId ) , inArray ( heartbeatRuns . status , [ "queued" , "running" ] ) ) )
2026-02-19 14:02:17 -06:00
. orderBy ( desc ( heartbeatRuns . createdAt ) ) ;
2026-02-20 10:32:17 -06:00
const sameScopeQueuedRun = activeRuns . find (
( candidate ) = > candidate . status === "queued" && isSameTaskScope ( runTaskKey ( candidate ) , taskKey ) ,
2026-02-19 14:02:17 -06:00
) ;
2026-02-20 10:32:17 -06:00
const sameScopeRunningRun = activeRuns . find (
( candidate ) = > candidate . status === "running" && isSameTaskScope ( runTaskKey ( candidate ) , taskKey ) ,
) ;
const shouldQueueFollowupForCommentWake =
Boolean ( wakeCommentId ) && Boolean ( sameScopeRunningRun ) && ! sameScopeQueuedRun ;
const coalescedTargetRun =
sameScopeQueuedRun ? ?
( shouldQueueFollowupForCommentWake ? null : sameScopeRunningRun ? ? null ) ;
if ( coalescedTargetRun ) {
const mergedContextSnapshot = mergeCoalescedContextSnapshot (
coalescedTargetRun . contextSnapshot ,
contextSnapshot ,
) ;
const mergedRun = await db
. update ( heartbeatRuns )
. set ( {
contextSnapshot : mergedContextSnapshot ,
updatedAt : new Date ( ) ,
} )
. where ( eq ( heartbeatRuns . id , coalescedTargetRun . id ) )
. returning ( )
. then ( ( rows ) = > rows [ 0 ] ? ? coalescedTargetRun ) ;
2026-02-17 12:24:43 -06:00
await db . insert ( agentWakeupRequests ) . values ( {
companyId : agent.companyId ,
agentId ,
source ,
triggerDetail ,
2026-02-19 09:09:40 -06:00
reason ,
payload ,
2026-02-17 12:24:43 -06:00
status : "coalesced" ,
coalescedCount : 1 ,
requestedByActorType : opts.requestedByActorType ? ? null ,
requestedByActorId : opts.requestedByActorId ? ? null ,
idempotencyKey : opts.idempotencyKey ? ? null ,
2026-02-20 10:32:17 -06:00
runId : mergedRun.id ,
2026-02-17 12:24:43 -06:00
finishedAt : new Date ( ) ,
} ) ;
2026-02-20 10:32:17 -06:00
return mergedRun ;
2026-02-17 12:24:43 -06:00
}
const wakeupRequest = await db
. insert ( agentWakeupRequests )
. values ( {
companyId : agent.companyId ,
agentId ,
source ,
triggerDetail ,
2026-02-19 09:09:40 -06:00
reason ,
payload ,
2026-02-17 12:24:43 -06:00
status : "queued" ,
requestedByActorType : opts.requestedByActorType ? ? null ,
requestedByActorId : opts.requestedByActorId ? ? null ,
idempotencyKey : opts.idempotencyKey ? ? null ,
} )
. returning ( )
. then ( ( rows ) = > rows [ 0 ] ) ;
2026-02-20 15:48:22 -06:00
const sessionBefore = await resolveSessionBeforeForWakeup ( agent , taskKey ) ;
2026-02-17 12:24:43 -06:00
2026-02-18 13:53:03 -06:00
const newRun = await db
2026-02-17 12:24:43 -06:00
. insert ( heartbeatRuns )
. values ( {
companyId : agent.companyId ,
agentId ,
invocationSource : source ,
triggerDetail ,
status : "queued" ,
wakeupRequestId : wakeupRequest.id ,
2026-02-20 15:48:22 -06:00
contextSnapshot : enrichedContextSnapshot ,
2026-02-19 14:02:17 -06:00
sessionIdBefore : sessionBefore ,
2026-02-17 12:24:43 -06:00
} )
. returning ( )
. then ( ( rows ) = > rows [ 0 ] ) ;
await db
. update ( agentWakeupRequests )
. set ( {
2026-02-18 13:53:03 -06:00
runId : newRun.id ,
2026-02-17 12:24:43 -06:00
updatedAt : new Date ( ) ,
} )
. where ( eq ( agentWakeupRequests . id , wakeupRequest . id ) ) ;
publishLiveEvent ( {
2026-02-18 13:53:03 -06:00
companyId : newRun.companyId ,
2026-02-17 12:24:43 -06:00
type : "heartbeat.run.queued" ,
payload : {
2026-02-18 13:53:03 -06:00
runId : newRun.id ,
agentId : newRun.agentId ,
invocationSource : newRun.invocationSource ,
triggerDetail : newRun.triggerDetail ,
wakeupRequestId : newRun.wakeupRequestId ,
2026-02-17 12:24:43 -06:00
} ,
} ) ;
2026-02-19 14:02:17 -06:00
await startNextQueuedRunForAgent ( agent . id ) ;
2026-02-17 12:24:43 -06:00
2026-02-18 13:53:03 -06:00
return newRun ;
2026-02-17 12:24:43 -06:00
}
Add server routes for companies, approvals, costs, and dashboard
New routes: companies, approvals, costs, dashboard, authz. New
services: companies, approvals, costs, dashboard, heartbeat,
activity-log. Add auth middleware and structured error handling.
Expand existing agent and issue routes with richer CRUD operations.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 09:07:27 -06:00
return {
2026-03-10 21:16:33 -05:00
list : async ( companyId : string , agentId? : string , limit? : number ) = > {
2026-02-25 21:35:33 -06:00
const query = db
2026-03-10 21:16:33 -05:00
. select ( heartbeatRunListColumns )
Add server routes for companies, approvals, costs, and dashboard
New routes: companies, approvals, costs, dashboard, authz. New
services: companies, approvals, costs, dashboard, heartbeat,
activity-log. Add auth middleware and structured error handling.
Expand existing agent and issue routes with richer CRUD operations.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 09:07:27 -06:00
. from ( heartbeatRuns )
2026-02-25 21:35:33 -06:00
. where (
agentId
? and ( eq ( heartbeatRuns . companyId , companyId ) , eq ( heartbeatRuns . agentId , agentId ) )
: eq ( heartbeatRuns . companyId , companyId ) ,
)
2026-02-17 12:24:43 -06:00
. orderBy ( desc ( heartbeatRuns . createdAt ) ) ;
2026-02-25 21:35:33 -06:00
2026-03-11 17:23:33 -05:00
const rows = limit ? await query . limit ( limit ) : await query ;
return rows . map ( ( row ) = > ( {
. . . row ,
resultJson : summarizeHeartbeatRunResultJson ( row . resultJson ) ,
} ) ) ;
2026-02-17 12:24:43 -06:00
} ,
getRun ,
getRuntimeState : async ( agentId : string ) = > {
const state = await getRuntimeState ( agentId ) ;
const agent = await getAgent ( agentId ) ;
if ( ! agent ) return null ;
2026-02-19 14:02:17 -06:00
const ensured = state ? ? ( await ensureRuntimeState ( agent ) ) ;
const latestTaskSession = await db
. select ( )
. from ( agentTaskSessions )
. where ( and ( eq ( agentTaskSessions . companyId , agent . companyId ) , eq ( agentTaskSessions . agentId , agent . id ) ) )
. orderBy ( desc ( agentTaskSessions . updatedAt ) )
. limit ( 1 )
. then ( ( rows ) = > rows [ 0 ] ? ? null ) ;
return {
. . . ensured ,
sessionDisplayId : latestTaskSession?.sessionDisplayId ? ? ensured . sessionId ,
sessionParamsJson : latestTaskSession?.sessionParamsJson ? ? null ,
} ;
2026-02-17 12:24:43 -06:00
} ,
2026-02-19 14:02:17 -06:00
listTaskSessions : async ( agentId : string ) = > {
2026-02-17 12:24:43 -06:00
const agent = await getAgent ( agentId ) ;
if ( ! agent ) throw notFound ( "Agent not found" ) ;
return db
2026-02-19 14:02:17 -06:00
. select ( )
. from ( agentTaskSessions )
. where ( and ( eq ( agentTaskSessions . companyId , agent . companyId ) , eq ( agentTaskSessions . agentId , agentId ) ) )
. orderBy ( desc ( agentTaskSessions . updatedAt ) , desc ( agentTaskSessions . createdAt ) ) ;
} ,
resetRuntimeSession : async ( agentId : string , opts ? : { taskKey? : string | null } ) = > {
const agent = await getAgent ( agentId ) ;
if ( ! agent ) throw notFound ( "Agent not found" ) ;
await ensureRuntimeState ( agent ) ;
const taskKey = readNonEmptyString ( opts ? . taskKey ) ;
const clearedTaskSessions = await clearTaskSessions (
agent . companyId ,
agent . id ,
taskKey ? { taskKey , adapterType : agent.adapterType } : undefined ,
) ;
const runtimePatch : Partial < typeof agentRuntimeState. $ inferInsert > = {
sessionId : null ,
lastError : null ,
updatedAt : new Date ( ) ,
} ;
if ( ! taskKey ) {
runtimePatch . stateJson = { } ;
}
const updated = await db
2026-02-17 12:24:43 -06:00
. update ( agentRuntimeState )
2026-02-19 14:02:17 -06:00
. set ( runtimePatch )
2026-02-17 12:24:43 -06:00
. where ( eq ( agentRuntimeState . agentId , agentId ) )
. returning ( )
. then ( ( rows ) = > rows [ 0 ] ? ? null ) ;
2026-02-19 14:02:17 -06:00
if ( ! updated ) return null ;
return {
. . . updated ,
sessionDisplayId : null ,
sessionParamsJson : null ,
clearedTaskSessions ,
} ;
2026-02-17 12:24:43 -06:00
} ,
listEvents : ( runId : string , afterSeq = 0 , limit = 200 ) = >
db
. select ( )
. from ( heartbeatRunEvents )
. where ( and ( eq ( heartbeatRunEvents . runId , runId ) , gt ( heartbeatRunEvents . seq , afterSeq ) ) )
. orderBy ( asc ( heartbeatRunEvents . seq ) )
. limit ( Math . max ( 1 , Math . min ( limit , 1000 ) ) ) ,
readLog : async ( runId : string , opts ? : { offset? : number ; limitBytes? : number } ) = > {
const run = await getRun ( runId ) ;
if ( ! run ) throw notFound ( "Heartbeat run not found" ) ;
if ( ! run . logStore || ! run . logRef ) throw notFound ( "Run log not found" ) ;
const result = await runLogStore . read (
{
store : run.logStore as "local_file" ,
logRef : run.logRef ,
} ,
opts ,
) ;
return {
runId ,
store : run.logStore ,
logRef : run.logRef ,
. . . result ,
2026-03-11 17:46:23 -05:00
content : redactCurrentUserText ( result . content ) ,
2026-02-17 12:24:43 -06:00
} ;
Add server routes for companies, approvals, costs, and dashboard
New routes: companies, approvals, costs, dashboard, authz. New
services: companies, approvals, costs, dashboard, heartbeat,
activity-log. Add auth middleware and structured error handling.
Expand existing agent and issue routes with richer CRUD operations.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 09:07:27 -06:00
} ,
invoke : async (
agentId : string ,
2026-02-17 12:24:43 -06:00
source : "timer" | "assignment" | "on_demand" | "automation" = "on_demand" ,
Add server routes for companies, approvals, costs, and dashboard
New routes: companies, approvals, costs, dashboard, authz. New
services: companies, approvals, costs, dashboard, heartbeat,
activity-log. Add auth middleware and structured error handling.
Expand existing agent and issue routes with richer CRUD operations.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 09:07:27 -06:00
contextSnapshot : Record < string , unknown > = { } ,
2026-02-17 12:24:43 -06:00
triggerDetail : "manual" | "ping" | "callback" | "system" = "manual" ,
actor ? : { actorType ? : "user" | "agent" | "system" ; actorId? : string | null } ,
) = >
enqueueWakeup ( agentId , {
source ,
triggerDetail ,
contextSnapshot ,
requestedByActorType : actor?.actorType ,
requestedByActorId : actor?.actorId ? ? null ,
} ) ,
Add server routes for companies, approvals, costs, and dashboard
New routes: companies, approvals, costs, dashboard, authz. New
services: companies, approvals, costs, dashboard, heartbeat,
activity-log. Add auth middleware and structured error handling.
Expand existing agent and issue routes with richer CRUD operations.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 09:07:27 -06:00
2026-02-17 12:24:43 -06:00
wakeup : enqueueWakeup ,
Add server routes for companies, approvals, costs, and dashboard
New routes: companies, approvals, costs, dashboard, authz. New
services: companies, approvals, costs, dashboard, heartbeat,
activity-log. Add auth middleware and structured error handling.
Expand existing agent and issue routes with richer CRUD operations.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 09:07:27 -06:00
2026-02-19 09:09:40 -06:00
reapOrphanedRuns ,
2026-03-13 06:56:31 -05:00
resumeQueuedRuns ,
2026-02-17 12:24:43 -06:00
tickTimers : async ( now = new Date ( ) ) = > {
const allAgents = await db . select ( ) . from ( agents ) ;
let checked = 0 ;
let enqueued = 0 ;
let skipped = 0 ;
Add server routes for companies, approvals, costs, and dashboard
New routes: companies, approvals, costs, dashboard, authz. New
services: companies, approvals, costs, dashboard, heartbeat,
activity-log. Add auth middleware and structured error handling.
Expand existing agent and issue routes with richer CRUD operations.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 09:07:27 -06:00
2026-02-17 12:24:43 -06:00
for ( const agent of allAgents ) {
2026-03-05 11:16:59 -05:00
if ( agent . status === "paused" || agent . status === "terminated" || agent . status === "pending_approval" ) continue ;
2026-02-17 12:24:43 -06:00
const policy = parseHeartbeatPolicy ( agent ) ;
if ( ! policy . enabled || policy . intervalSec <= 0 ) continue ;
Add server routes for companies, approvals, costs, and dashboard
New routes: companies, approvals, costs, dashboard, authz. New
services: companies, approvals, costs, dashboard, heartbeat,
activity-log. Add auth middleware and structured error handling.
Expand existing agent and issue routes with richer CRUD operations.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 09:07:27 -06:00
2026-02-17 12:24:43 -06:00
checked += 1 ;
2026-03-03 13:39:03 -06:00
const baseline = new Date ( agent . lastHeartbeatAt ? ? agent . createdAt ) . getTime ( ) ;
const elapsedMs = now . getTime ( ) - baseline ;
if ( elapsedMs < policy . intervalSec * 1000 ) continue ;
Add server routes for companies, approvals, costs, and dashboard
New routes: companies, approvals, costs, dashboard, authz. New
services: companies, approvals, costs, dashboard, heartbeat,
activity-log. Add auth middleware and structured error handling.
Expand existing agent and issue routes with richer CRUD operations.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 09:07:27 -06:00
2026-02-17 12:24:43 -06:00
const run = await enqueueWakeup ( agent . id , {
source : "timer" ,
triggerDetail : "system" ,
reason : "heartbeat_timer" ,
requestedByActorType : "system" ,
requestedByActorId : "heartbeat_scheduler" ,
contextSnapshot : {
source : "scheduler" ,
reason : "interval_elapsed" ,
now : now.toISOString ( ) ,
} ,
} ) ;
if ( run ) enqueued += 1 ;
else skipped += 1 ;
}
Add server routes for companies, approvals, costs, and dashboard
New routes: companies, approvals, costs, dashboard, authz. New
services: companies, approvals, costs, dashboard, heartbeat,
activity-log. Add auth middleware and structured error handling.
Expand existing agent and issue routes with richer CRUD operations.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 09:07:27 -06:00
2026-02-17 12:24:43 -06:00
return { checked , enqueued , skipped } ;
Add server routes for companies, approvals, costs, and dashboard
New routes: companies, approvals, costs, dashboard, authz. New
services: companies, approvals, costs, dashboard, heartbeat,
activity-log. Add auth middleware and structured error handling.
Expand existing agent and issue routes with richer CRUD operations.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 09:07:27 -06:00
} ,
cancelRun : async ( runId : string ) = > {
2026-02-17 12:24:43 -06:00
const run = await getRun ( runId ) ;
Add server routes for companies, approvals, costs, and dashboard
New routes: companies, approvals, costs, dashboard, authz. New
services: companies, approvals, costs, dashboard, heartbeat,
activity-log. Add auth middleware and structured error handling.
Expand existing agent and issue routes with richer CRUD operations.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 09:07:27 -06:00
if ( ! run ) throw notFound ( "Heartbeat run not found" ) ;
if ( run . status !== "running" && run . status !== "queued" ) return run ;
const running = runningProcesses . get ( run . id ) ;
if ( running ) {
running . child . kill ( "SIGTERM" ) ;
const graceMs = Math . max ( 1 , running . graceSec ) * 1000 ;
setTimeout ( ( ) = > {
if ( ! running . child . killed ) {
running . child . kill ( "SIGKILL" ) ;
}
} , graceMs ) ;
}
const cancelled = await setRunStatus ( run . id , "cancelled" , {
finishedAt : new Date ( ) ,
error : "Cancelled by control plane" ,
2026-02-17 12:24:43 -06:00
errorCode : "cancelled" ,
} ) ;
await setWakeupStatus ( run . wakeupRequestId , "cancelled" , {
finishedAt : new Date ( ) ,
error : "Cancelled by control plane" ,
Add server routes for companies, approvals, costs, and dashboard
New routes: companies, approvals, costs, dashboard, authz. New
services: companies, approvals, costs, dashboard, heartbeat,
activity-log. Add auth middleware and structured error handling.
Expand existing agent and issue routes with richer CRUD operations.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 09:07:27 -06:00
} ) ;
2026-02-17 12:24:43 -06:00
if ( cancelled ) {
await appendRunEvent ( cancelled , 1 , {
eventType : "lifecycle" ,
stream : "system" ,
level : "warn" ,
message : "run cancelled" ,
} ) ;
2026-02-20 15:48:22 -06:00
await releaseIssueExecutionAndPromote ( cancelled ) ;
2026-02-17 12:24:43 -06:00
}
Add server routes for companies, approvals, costs, and dashboard
New routes: companies, approvals, costs, dashboard, authz. New
services: companies, approvals, costs, dashboard, heartbeat,
activity-log. Add auth middleware and structured error handling.
Expand existing agent and issue routes with richer CRUD operations.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 09:07:27 -06:00
runningProcesses . delete ( run . id ) ;
2026-02-17 12:24:43 -06:00
await finalizeAgentStatus ( run . agentId , "cancelled" ) ;
2026-02-19 14:02:17 -06:00
await startNextQueuedRunForAgent ( run . agentId ) ;
Add server routes for companies, approvals, costs, and dashboard
New routes: companies, approvals, costs, dashboard, authz. New
services: companies, approvals, costs, dashboard, heartbeat,
activity-log. Add auth middleware and structured error handling.
Expand existing agent and issue routes with richer CRUD operations.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 09:07:27 -06:00
return cancelled ;
} ,
cancelActiveForAgent : async ( agentId : string ) = > {
const runs = await db
. select ( )
. from ( heartbeatRuns )
2026-02-17 12:24:43 -06:00
. where ( and ( eq ( heartbeatRuns . agentId , agentId ) , inArray ( heartbeatRuns . status , [ "queued" , "running" ] ) ) ) ;
Add server routes for companies, approvals, costs, and dashboard
New routes: companies, approvals, costs, dashboard, authz. New
services: companies, approvals, costs, dashboard, heartbeat,
activity-log. Add auth middleware and structured error handling.
Expand existing agent and issue routes with richer CRUD operations.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 09:07:27 -06:00
for ( const run of runs ) {
2026-02-17 12:24:43 -06:00
await setRunStatus ( run . id , "cancelled" , {
finishedAt : new Date ( ) ,
error : "Cancelled due to agent pause" ,
errorCode : "cancelled" ,
} ) ;
await setWakeupStatus ( run . wakeupRequestId , "cancelled" , {
finishedAt : new Date ( ) ,
error : "Cancelled due to agent pause" ,
} ) ;
Add server routes for companies, approvals, costs, and dashboard
New routes: companies, approvals, costs, dashboard, authz. New
services: companies, approvals, costs, dashboard, heartbeat,
activity-log. Add auth middleware and structured error handling.
Expand existing agent and issue routes with richer CRUD operations.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 09:07:27 -06:00
const running = runningProcesses . get ( run . id ) ;
if ( running ) {
running . child . kill ( "SIGTERM" ) ;
runningProcesses . delete ( run . id ) ;
}
2026-02-20 15:48:22 -06:00
await releaseIssueExecutionAndPromote ( run ) ;
Add server routes for companies, approvals, costs, and dashboard
New routes: companies, approvals, costs, dashboard, authz. New
services: companies, approvals, costs, dashboard, heartbeat,
activity-log. Add auth middleware and structured error handling.
Expand existing agent and issue routes with richer CRUD operations.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 09:07:27 -06:00
}
return runs . length ;
} ,
2026-02-19 09:09:40 -06:00
getActiveRunForAgent : async ( agentId : string ) = > {
const [ run ] = await db
. select ( )
. from ( heartbeatRuns )
. where (
and (
eq ( heartbeatRuns . agentId , agentId ) ,
eq ( heartbeatRuns . status , "running" ) ,
) ,
)
. orderBy ( desc ( heartbeatRuns . startedAt ) )
. limit ( 1 ) ;
return run ? ? null ;
} ,
Add server routes for companies, approvals, costs, and dashboard
New routes: companies, approvals, costs, dashboard, authz. New
services: companies, approvals, costs, dashboard, heartbeat,
activity-log. Add auth middleware and structured error handling.
Expand existing agent and issue routes with richer CRUD operations.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 09:07:27 -06:00
} ;
}