fix: harden routine dispatch and permissions

This commit is contained in:
dotta 2026-03-20 16:15:32 -05:00
parent e3c92a20f1
commit 99eb317600
4 changed files with 417 additions and 139 deletions

View file

@ -26,11 +26,13 @@ import type {
UpdateRoutineTrigger,
} from "@paperclipai/shared";
import { conflict, forbidden, notFound, unauthorized, unprocessable } from "../errors.js";
import { logger } from "../middleware/logger.js";
import { issueService } from "./issues.js";
import { secretService } from "./secrets.js";
import { parseCron, validateCron } from "./cron.js";
import { heartbeatService } from "./heartbeat.js";
import { queueIssueAssignmentWakeup, type IssueAssignmentWakeupDeps } from "./issue-assignment-wakeup.js";
import { logActivity } from "./activity-log.js";
const OPEN_ISSUE_STATUSES = ["backlog", "todo", "in_progress", "in_review", "blocked"];
const LIVE_HEARTBEAT_RUN_STATUSES = ["queued", "running"];
@ -386,8 +388,8 @@ export function routineService(db: Db, deps: { heartbeat?: IssueAssignmentWakeup
status: string;
issueId?: string | null;
nextRunAt?: Date | null;
}) {
await db
}, executor: Db = db) {
await executor
.update(routines)
.set({
lastTriggeredAt: input.triggeredAt,
@ -397,7 +399,7 @@ export function routineService(db: Db, deps: { heartbeat?: IssueAssignmentWakeup
.where(eq(routines.id, input.routineId));
if (input.triggerId) {
await db
await executor
.update(routineTriggers)
.set({
lastFiredAt: input.triggeredAt,
@ -409,8 +411,8 @@ export function routineService(db: Db, deps: { heartbeat?: IssueAssignmentWakeup
}
}
async function findLiveExecutionIssue(routine: typeof routines.$inferSelect) {
const executionBoundIssue = await db
async function findLiveExecutionIssue(routine: typeof routines.$inferSelect, executor: Db = db) {
const executionBoundIssue = await executor
.select()
.from(issues)
.innerJoin(
@ -434,7 +436,7 @@ export function routineService(db: Db, deps: { heartbeat?: IssueAssignmentWakeup
.then((rows) => rows[0]?.issues ?? null);
if (executionBoundIssue) return executionBoundIssue;
return db
return executor
.select()
.from(issues)
.innerJoin(
@ -459,8 +461,8 @@ export function routineService(db: Db, deps: { heartbeat?: IssueAssignmentWakeup
.then((rows) => rows[0]?.issues ?? null);
}
async function finalizeRun(runId: string, patch: Partial<typeof routineRuns.$inferInsert>) {
return db
async function finalizeRun(runId: string, patch: Partial<typeof routineRuns.$inferInsert>, executor: Db = db) {
return executor
.update(routineRuns)
.set({
...patch,
@ -509,150 +511,181 @@ export function routineService(db: Db, deps: { heartbeat?: IssueAssignmentWakeup
payload?: Record<string, unknown> | null;
idempotencyKey?: string | null;
}) {
if (input.idempotencyKey) {
const existing = await db
.select()
.from(routineRuns)
.where(
and(
eq(routineRuns.companyId, input.routine.companyId),
eq(routineRuns.routineId, input.routine.id),
eq(routineRuns.source, input.source),
eq(routineRuns.idempotencyKey, input.idempotencyKey),
input.trigger ? eq(routineRuns.triggerId, input.trigger.id) : isNull(routineRuns.triggerId),
),
)
.orderBy(desc(routineRuns.createdAt))
.limit(1)
.then((rows) => rows[0] ?? null);
if (existing) return existing;
}
const run = await db.transaction(async (tx) => {
const txDb = tx as unknown as Db;
await tx.execute(
sql`select id from ${routines} where ${routines.id} = ${input.routine.id} and ${routines.companyId} = ${input.routine.companyId} for update`,
);
const triggeredAt = new Date();
const [run] = await db
.insert(routineRuns)
.values({
companyId: input.routine.companyId,
routineId: input.routine.id,
triggerId: input.trigger?.id ?? null,
source: input.source,
status: "received",
triggeredAt,
idempotencyKey: input.idempotencyKey ?? null,
triggerPayload: input.payload ?? null,
})
.returning();
const nextRunAt = input.trigger?.kind === "schedule" && input.trigger.cronExpression && input.trigger.timezone
? nextCronTickInTimeZone(input.trigger.cronExpression, input.trigger.timezone, triggeredAt)
: undefined;
try {
const activeIssue = await findLiveExecutionIssue(input.routine);
if (activeIssue && input.routine.concurrencyPolicy !== "always_enqueue") {
const status = input.routine.concurrencyPolicy === "skip_if_active" ? "skipped" : "coalesced";
const updated = await finalizeRun(run.id, {
status,
linkedIssueId: activeIssue.id,
coalescedIntoRunId: activeIssue.originRunId,
completedAt: triggeredAt,
});
await updateRoutineTouchedState({
routineId: input.routine.id,
triggerId: input.trigger?.id ?? null,
triggeredAt,
status,
issueId: activeIssue.id,
nextRunAt,
});
return updated ?? run;
if (input.idempotencyKey) {
const existing = await txDb
.select()
.from(routineRuns)
.where(
and(
eq(routineRuns.companyId, input.routine.companyId),
eq(routineRuns.routineId, input.routine.id),
eq(routineRuns.source, input.source),
eq(routineRuns.idempotencyKey, input.idempotencyKey),
input.trigger ? eq(routineRuns.triggerId, input.trigger.id) : isNull(routineRuns.triggerId),
),
)
.orderBy(desc(routineRuns.createdAt))
.limit(1)
.then((rows) => rows[0] ?? null);
if (existing) return existing;
}
let createdIssue;
const triggeredAt = new Date();
const [createdRun] = await txDb
.insert(routineRuns)
.values({
companyId: input.routine.companyId,
routineId: input.routine.id,
triggerId: input.trigger?.id ?? null,
source: input.source,
status: "received",
triggeredAt,
idempotencyKey: input.idempotencyKey ?? null,
triggerPayload: input.payload ?? null,
})
.returning();
const nextRunAt = input.trigger?.kind === "schedule" && input.trigger.cronExpression && input.trigger.timezone
? nextCronTickInTimeZone(input.trigger.cronExpression, input.trigger.timezone, triggeredAt)
: undefined;
try {
createdIssue = await issueSvc.create(input.routine.companyId, {
projectId: input.routine.projectId,
goalId: input.routine.goalId,
parentId: input.routine.parentIssueId,
title: input.routine.title,
description: input.routine.description,
status: "todo",
priority: input.routine.priority,
assigneeAgentId: input.routine.assigneeAgentId,
originKind: "routine_execution",
originId: input.routine.id,
originRunId: run.id,
});
} catch (error) {
const isOpenExecutionConflict =
!!error &&
typeof error === "object" &&
"code" in error &&
(error as { code?: string }).code === "23505" &&
"constraint" in error &&
(error as { constraint?: string }).constraint === "issues_open_routine_execution_uq";
if (!isOpenExecutionConflict || input.routine.concurrencyPolicy === "always_enqueue") {
throw error;
const activeIssue = await findLiveExecutionIssue(input.routine, txDb);
if (activeIssue && input.routine.concurrencyPolicy !== "always_enqueue") {
const status = input.routine.concurrencyPolicy === "skip_if_active" ? "skipped" : "coalesced";
const updated = await finalizeRun(createdRun.id, {
status,
linkedIssueId: activeIssue.id,
coalescedIntoRunId: activeIssue.originRunId,
completedAt: triggeredAt,
}, txDb);
await updateRoutineTouchedState({
routineId: input.routine.id,
triggerId: input.trigger?.id ?? null,
triggeredAt,
status,
issueId: activeIssue.id,
nextRunAt,
}, txDb);
return updated ?? createdRun;
}
const existingIssue = await findLiveExecutionIssue(input.routine);
if (!existingIssue) throw error;
const status = input.routine.concurrencyPolicy === "skip_if_active" ? "skipped" : "coalesced";
const updated = await finalizeRun(run.id, {
status,
linkedIssueId: existingIssue.id,
coalescedIntoRunId: existingIssue.originRunId,
completedAt: triggeredAt,
let createdIssue;
try {
createdIssue = await issueSvc.create(input.routine.companyId, {
projectId: input.routine.projectId,
goalId: input.routine.goalId,
parentId: input.routine.parentIssueId,
title: input.routine.title,
description: input.routine.description,
status: "todo",
priority: input.routine.priority,
assigneeAgentId: input.routine.assigneeAgentId,
originKind: "routine_execution",
originId: input.routine.id,
originRunId: createdRun.id,
});
} catch (error) {
const isOpenExecutionConflict =
!!error &&
typeof error === "object" &&
"code" in error &&
(error as { code?: string }).code === "23505" &&
"constraint" in error &&
(error as { constraint?: string }).constraint === "issues_open_routine_execution_uq";
if (!isOpenExecutionConflict || input.routine.concurrencyPolicy === "always_enqueue") {
throw error;
}
const existingIssue = await findLiveExecutionIssue(input.routine, txDb);
if (!existingIssue) throw error;
const status = input.routine.concurrencyPolicy === "skip_if_active" ? "skipped" : "coalesced";
const updated = await finalizeRun(createdRun.id, {
status,
linkedIssueId: existingIssue.id,
coalescedIntoRunId: existingIssue.originRunId,
completedAt: triggeredAt,
}, txDb);
await updateRoutineTouchedState({
routineId: input.routine.id,
triggerId: input.trigger?.id ?? null,
triggeredAt,
status,
issueId: existingIssue.id,
nextRunAt,
}, txDb);
return updated ?? createdRun;
}
// Keep the dispatch lock until the issue is linked to a queued heartbeat run.
await queueIssueAssignmentWakeup({
heartbeat,
issue: createdIssue,
reason: "issue_assigned",
mutation: "create",
contextSource: "routine.dispatch",
requestedByActorType: input.source === "schedule" ? "system" : undefined,
});
const updated = await finalizeRun(createdRun.id, {
status: "issue_created",
linkedIssueId: createdIssue.id,
}, txDb);
await updateRoutineTouchedState({
routineId: input.routine.id,
triggerId: input.trigger?.id ?? null,
triggeredAt,
status,
issueId: existingIssue.id,
status: "issue_created",
issueId: createdIssue.id,
nextRunAt,
});
return updated ?? run;
}, txDb);
return updated ?? createdRun;
} catch (error) {
const failureReason = error instanceof Error ? error.message : String(error);
const failed = await finalizeRun(createdRun.id, {
status: "failed",
failureReason,
completedAt: new Date(),
}, txDb);
await updateRoutineTouchedState({
routineId: input.routine.id,
triggerId: input.trigger?.id ?? null,
triggeredAt,
status: "failed",
nextRunAt,
}, txDb);
return failed ?? createdRun;
}
});
const updated = await finalizeRun(run.id, {
status: "issue_created",
linkedIssueId: createdIssue.id,
});
// Ensure the wake request is durably queued before reporting the routine run as created.
await queueIssueAssignmentWakeup({
heartbeat,
issue: createdIssue,
reason: "issue_assigned",
mutation: "create",
contextSource: "routine.dispatch",
requestedByActorType: input.source === "schedule" ? "system" : undefined,
});
await updateRoutineTouchedState({
routineId: input.routine.id,
triggerId: input.trigger?.id ?? null,
triggeredAt,
status: "issue_created",
issueId: createdIssue.id,
nextRunAt,
});
return updated ?? run;
} catch (error) {
const failureReason = error instanceof Error ? error.message : String(error);
const failed = await finalizeRun(run.id, {
status: "failed",
failureReason,
completedAt: new Date(),
});
await updateRoutineTouchedState({
routineId: input.routine.id,
triggerId: input.trigger?.id ?? null,
triggeredAt,
status: "failed",
nextRunAt,
});
return failed ?? run;
if (input.source === "schedule" || input.source === "webhook") {
const actorId = input.source === "schedule" ? "routine-scheduler" : "routine-webhook";
try {
await logActivity(db, {
companyId: input.routine.companyId,
actorType: "system",
actorId,
action: "routine.run_triggered",
entityType: "routine_run",
entityId: run.id,
details: {
routineId: input.routine.id,
triggerId: input.trigger?.id ?? null,
source: run.source,
status: run.status,
},
});
} catch (err) {
logger.warn({ err, routineId: input.routine.id, runId: run.id }, "failed to log automated routine run");
}
}
return run;
}
return {