feat(routines): add workspace-aware routine runs

Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
dotta 2026-04-02 11:38:57 -05:00
parent 36376968af
commit 909e8cd4c8
38 changed files with 15468 additions and 250 deletions

View file

@ -21,10 +21,16 @@ import type {
RoutineRunSummary,
RoutineTrigger,
RoutineTriggerSecretMaterial,
RoutineVariable,
RunRoutine,
UpdateRoutine,
UpdateRoutineTrigger,
} from "@paperclipai/shared";
import {
interpolateRoutineTemplate,
stringifyRoutineVariableValue,
syncRoutineVariablesWithTemplate,
} from "@paperclipai/shared";
import { conflict, forbidden, notFound, unauthorized, unprocessable } from "../errors.js";
import { logger } from "../middleware/logger.js";
import { issueService } from "./issues.js";
@ -138,6 +144,145 @@ function normalizeWebhookTimestampMs(rawTimestamp: string) {
return parsed > 1e12 ? parsed : parsed * 1000;
}
function isPlainRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}
function parseBooleanVariableValue(name: string, raw: unknown) {
if (typeof raw === "boolean") return raw;
if (typeof raw === "number" && (raw === 0 || raw === 1)) return raw === 1;
if (typeof raw === "string") {
const normalized = raw.trim().toLowerCase();
if (["true", "1", "yes", "y", "on"].includes(normalized)) return true;
if (["false", "0", "no", "n", "off"].includes(normalized)) return false;
}
throw unprocessable(`Variable "${name}" must be a boolean`);
}
function parseNumberVariableValue(name: string, raw: unknown) {
if (typeof raw === "number" && Number.isFinite(raw)) return raw;
if (typeof raw === "string" && raw.trim().length > 0) {
const parsed = Number(raw);
if (Number.isFinite(parsed)) return parsed;
}
throw unprocessable(`Variable "${name}" must be a number`);
}
function normalizeRoutineVariableValue(variable: RoutineVariable, raw: unknown): string | number | boolean | null {
if (raw == null) return null;
if (variable.type === "boolean") return parseBooleanVariableValue(variable.name, raw);
if (variable.type === "number") return parseNumberVariableValue(variable.name, raw);
const normalized = stringifyRoutineVariableValue(raw);
if (variable.type === "select") {
if (!variable.options.includes(normalized)) {
throw unprocessable(`Variable "${variable.name}" must match one of: ${variable.options.join(", ")}`);
}
}
return normalized;
}
function isMissingRoutineVariableValue(value: string | number | boolean | null) {
return value == null || (typeof value === "string" && value.trim().length === 0);
}
function assertRoutineVariableDefinitions(variables: RoutineVariable[]) {
for (const variable of variables) {
if (variable.defaultValue != null) {
normalizeRoutineVariableValue(variable, variable.defaultValue);
}
if (variable.type === "select" && variable.options.length === 0) {
throw unprocessable(`Variable "${variable.name}" must define at least one option`);
}
}
}
function sanitizeRoutineVariableInputs(
variables: Array<Partial<RoutineVariable> & Pick<RoutineVariable, "name">> | null | undefined,
): RoutineVariable[] {
return (variables ?? []).map((variable) => ({
name: variable.name,
label: variable.label ?? null,
type: variable.type ?? "text",
defaultValue: variable.defaultValue ?? null,
required: variable.required ?? true,
options: variable.options ?? [],
}));
}
function assertScheduleCompatibleVariables(variables: RoutineVariable[]) {
const missingDefaults = variables
.filter((variable) => variable.required)
.filter((variable) => isMissingRoutineVariableValue(normalizeRoutineVariableValue(variable, variable.defaultValue)))
.map((variable) => variable.name);
if (missingDefaults.length > 0) {
throw unprocessable(
`Scheduled routines require defaults for required variables: ${missingDefaults.join(", ")}`,
);
}
}
function collectProvidedRoutineVariables(
source: "schedule" | "manual" | "api" | "webhook",
payload: Record<string, unknown> | null | undefined,
variables: Record<string, unknown> | null | undefined,
) {
const nestedVariables = isPlainRecord(payload) && isPlainRecord(payload.variables) ? payload.variables : {};
const provided = {
...(source === "webhook" && payload ? payload : {}),
...nestedVariables,
...(variables ?? {}),
};
delete provided.variables;
return provided;
}
function resolveRoutineVariableValues(
variables: RoutineVariable[],
input: {
source: "schedule" | "manual" | "api" | "webhook";
payload?: Record<string, unknown> | null;
variables?: Record<string, unknown> | null;
},
) {
if (variables.length === 0) return {} as Record<string, string | number | boolean>;
const provided = collectProvidedRoutineVariables(input.source, input.payload, input.variables);
const resolved: Record<string, string | number | boolean> = {};
const missing: string[] = [];
for (const variable of variables) {
const candidate = provided[variable.name] !== undefined ? provided[variable.name] : variable.defaultValue;
const normalized = normalizeRoutineVariableValue(variable, candidate);
if (normalized == null || (typeof normalized === "string" && normalized.trim().length === 0)) {
if (variable.required) missing.push(variable.name);
continue;
}
resolved[variable.name] = normalized;
}
if (missing.length > 0) {
throw unprocessable(`Missing routine variables: ${missing.join(", ")}`);
}
return resolved;
}
function mergeRoutineRunPayload(
payload: Record<string, unknown> | null | undefined,
variables: Record<string, string | number | boolean>,
) {
if (Object.keys(variables).length === 0) return payload ?? null;
if (!payload) return { variables };
const existingVariables = isPlainRecord(payload.variables) ? payload.variables : {};
return {
...payload,
variables: {
...existingVariables,
...variables,
},
};
}
export function routineService(db: Db, deps: { heartbeat?: IssueAssignmentWakeupDeps } = {}) {
const issueSvc = issueService(db);
const secretsSvc = secretService(db);
@ -515,8 +660,15 @@ export function routineService(db: Db, deps: { heartbeat?: IssueAssignmentWakeup
trigger: typeof routineTriggers.$inferSelect | null;
source: "schedule" | "manual" | "api" | "webhook";
payload?: Record<string, unknown> | null;
variables?: Record<string, unknown> | null;
idempotencyKey?: string | null;
executionWorkspaceId?: string | null;
executionWorkspacePreference?: string | null;
executionWorkspaceSettings?: Record<string, unknown> | null;
}) {
const resolvedVariables = resolveRoutineVariableValues(input.routine.variables ?? [], input);
const description = interpolateRoutineTemplate(input.routine.description, resolvedVariables);
const triggerPayload = mergeRoutineRunPayload(input.payload, resolvedVariables);
const run = await db.transaction(async (tx) => {
const txDb = tx as unknown as Db;
await tx.execute(
@ -553,7 +705,7 @@ export function routineService(db: Db, deps: { heartbeat?: IssueAssignmentWakeup
status: "received",
triggeredAt,
idempotencyKey: input.idempotencyKey ?? null,
triggerPayload: input.payload ?? null,
triggerPayload,
})
.returning();
@ -589,13 +741,16 @@ export function routineService(db: Db, deps: { heartbeat?: IssueAssignmentWakeup
goalId: input.routine.goalId,
parentId: input.routine.parentIssueId,
title: input.routine.title,
description: input.routine.description,
description,
status: "todo",
priority: input.routine.priority,
assigneeAgentId: input.routine.assigneeAgentId,
originKind: "routine_execution",
originId: input.routine.id,
originRunId: createdRun.id,
executionWorkspaceId: input.executionWorkspaceId ?? null,
executionWorkspacePreference: input.executionWorkspacePreference ?? null,
executionWorkspaceSettings: input.executionWorkspaceSettings ?? null,
});
} catch (error) {
const isOpenExecutionConflict =
@ -824,6 +979,11 @@ export function routineService(db: Db, deps: { heartbeat?: IssueAssignmentWakeup
await assertAssignableAgent(companyId, input.assigneeAgentId);
if (input.goalId) await assertGoal(companyId, input.goalId);
if (input.parentIssueId) await assertParentIssue(companyId, input.parentIssueId);
const variables = syncRoutineVariablesWithTemplate(
input.description,
sanitizeRoutineVariableInputs(input.variables),
);
assertRoutineVariableDefinitions(variables);
const [created] = await db
.insert(routines)
.values({
@ -838,6 +998,7 @@ export function routineService(db: Db, deps: { heartbeat?: IssueAssignmentWakeup
status: input.status,
concurrencyPolicy: input.concurrencyPolicy,
catchUpPolicy: input.catchUpPolicy,
variables,
createdByAgentId: actor.agentId ?? null,
createdByUserId: actor.userId ?? null,
updatedByAgentId: actor.agentId ?? null,
@ -852,10 +1013,31 @@ export function routineService(db: Db, deps: { heartbeat?: IssueAssignmentWakeup
if (!existing) return null;
const nextProjectId = patch.projectId ?? existing.projectId;
const nextAssigneeAgentId = patch.assigneeAgentId ?? existing.assigneeAgentId;
const nextDescription = patch.description === undefined ? existing.description : patch.description;
const nextVariables = syncRoutineVariablesWithTemplate(
nextDescription,
patch.variables === undefined ? existing.variables : sanitizeRoutineVariableInputs(patch.variables),
);
if (patch.projectId) await assertProject(existing.companyId, nextProjectId);
if (patch.assigneeAgentId) await assertAssignableAgent(existing.companyId, nextAssigneeAgentId);
if (patch.goalId) await assertGoal(existing.companyId, patch.goalId);
if (patch.parentIssueId) await assertParentIssue(existing.companyId, patch.parentIssueId);
assertRoutineVariableDefinitions(nextVariables);
const enabledScheduleTriggers = await db
.select({ id: routineTriggers.id })
.from(routineTriggers)
.where(
and(
eq(routineTriggers.routineId, existing.id),
eq(routineTriggers.kind, "schedule"),
eq(routineTriggers.enabled, true),
),
)
.limit(1)
.then((rows) => rows.length > 0);
if (enabledScheduleTriggers) {
assertScheduleCompatibleVariables(nextVariables);
}
const [updated] = await db
.update(routines)
.set({
@ -863,12 +1045,13 @@ export function routineService(db: Db, deps: { heartbeat?: IssueAssignmentWakeup
goalId: patch.goalId === undefined ? existing.goalId : patch.goalId,
parentIssueId: patch.parentIssueId === undefined ? existing.parentIssueId : patch.parentIssueId,
title: patch.title ?? existing.title,
description: patch.description === undefined ? existing.description : patch.description,
description: nextDescription,
assigneeAgentId: nextAssigneeAgentId,
priority: patch.priority ?? existing.priority,
status: patch.status ?? existing.status,
concurrencyPolicy: patch.concurrencyPolicy ?? existing.concurrencyPolicy,
catchUpPolicy: patch.catchUpPolicy ?? existing.catchUpPolicy,
variables: nextVariables,
updatedByAgentId: actor.agentId ?? null,
updatedByUserId: actor.userId ?? null,
updatedAt: new Date(),
@ -892,6 +1075,7 @@ export function routineService(db: Db, deps: { heartbeat?: IssueAssignmentWakeup
let nextRunAt: Date | null = null;
if (input.kind === "schedule") {
assertScheduleCompatibleVariables(routine.variables ?? []);
const timeZone = input.timezone || "UTC";
assertTimeZone(timeZone);
const error = validateCron(input.cronExpression);
@ -947,6 +1131,8 @@ export function routineService(db: Db, deps: { heartbeat?: IssueAssignmentWakeup
let timezone = existing.timezone;
if (existing.kind === "schedule") {
const routine = await getRoutineById(existing.routineId);
if (!routine) throw notFound("Routine not found");
if (patch.cronExpression !== undefined) {
if (patch.cronExpression == null) throw unprocessable("Scheduled triggers require cronExpression");
const error = validateCron(patch.cronExpression);
@ -961,6 +1147,9 @@ export function routineService(db: Db, deps: { heartbeat?: IssueAssignmentWakeup
if (cronExpression && timezone) {
nextRunAt = nextCronTickInTimeZone(cronExpression, timezone, new Date());
}
if ((patch.enabled ?? existing.enabled) === true) {
assertScheduleCompatibleVariables(routine.variables ?? []);
}
}
const [updated] = await db
@ -1034,7 +1223,12 @@ export function routineService(db: Db, deps: { heartbeat?: IssueAssignmentWakeup
trigger,
source: input.source,
payload: input.payload as Record<string, unknown> | null | undefined,
variables: input.variables as Record<string, unknown> | null | undefined,
idempotencyKey: input.idempotencyKey,
executionWorkspaceId: input.executionWorkspaceId ?? null,
executionWorkspacePreference: input.executionWorkspacePreference ?? null,
executionWorkspaceSettings:
(input.executionWorkspaceSettings as Record<string, unknown> | null | undefined) ?? null,
});
},
@ -1097,6 +1291,9 @@ export function routineService(db: Db, deps: { heartbeat?: IssueAssignmentWakeup
trigger,
source: "webhook",
payload: input.payload,
variables: isPlainRecord(input.payload) && isPlainRecord(input.payload.variables)
? input.payload.variables
: null,
idempotencyKey: input.idempotencyKey,
});
},