Add issue review policy and comment retry

Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
dotta 2026-04-06 08:40:38 -05:00
parent 4b39b0cc14
commit b3e0c31239
18 changed files with 1409 additions and 5 deletions

View file

@ -1835,6 +1835,210 @@ export function heartbeatService(db: Db) {
return updated;
}
async function patchRunIssueCommentStatus(
runId: string,
patch: Partial<Pick<typeof heartbeatRuns.$inferInsert, "issueCommentStatus" | "issueCommentSatisfiedByCommentId" | "issueCommentRetryQueuedAt">>,
) {
return db
.update(heartbeatRuns)
.set({ ...patch, updatedAt: new Date() })
.where(eq(heartbeatRuns.id, runId))
.returning()
.then((rows) => rows[0] ?? null);
}
async function findRunIssueComment(runId: string, companyId: string, issueId: string) {
return db
.select({
id: issueComments.id,
})
.from(issueComments)
.where(
and(
eq(issueComments.companyId, companyId),
eq(issueComments.issueId, issueId),
eq(issueComments.createdByRunId, runId),
),
)
.orderBy(desc(issueComments.createdAt), desc(issueComments.id))
.limit(1)
.then((rows) => rows[0] ?? null);
}
async function enqueueMissingIssueCommentRetry(
run: typeof heartbeatRuns.$inferSelect,
agent: typeof agents.$inferSelect,
issueId: string,
) {
const contextSnapshot = parseObject(run.contextSnapshot);
const taskKey = deriveTaskKeyWithHeartbeatFallback(contextSnapshot, null);
const sessionBefore = await resolveSessionBeforeForWakeup(agent, taskKey);
const retryContextSnapshot = {
...contextSnapshot,
retryOfRunId: run.id,
wakeReason: "missing_issue_comment",
retryReason: "missing_issue_comment",
missingIssueCommentForRunId: run.id,
};
const now = new Date();
const retryRun = await db.transaction(async (tx) => {
await tx.execute(
sql`select id from issues where company_id = ${run.companyId} and execution_run_id = ${run.id} for update`,
);
const issue = await tx
.select({ id: issues.id })
.from(issues)
.where(and(eq(issues.companyId, run.companyId), eq(issues.executionRunId, run.id)))
.then((rows) => rows[0] ?? null);
if (!issue) return null;
const wakeupRequest = await tx
.insert(agentWakeupRequests)
.values({
companyId: run.companyId,
agentId: run.agentId,
source: "automation",
triggerDetail: "system",
reason: "missing_issue_comment",
payload: {
issueId,
retryOfRunId: run.id,
retryReason: "missing_issue_comment",
},
status: "queued",
requestedByActorType: "system",
requestedByActorId: null,
updatedAt: now,
})
.returning()
.then((rows) => rows[0]);
const queuedRun = await tx
.insert(heartbeatRuns)
.values({
companyId: run.companyId,
agentId: run.agentId,
invocationSource: "automation",
triggerDetail: "system",
status: "queued",
wakeupRequestId: wakeupRequest.id,
contextSnapshot: retryContextSnapshot,
sessionIdBefore: sessionBefore,
retryOfRunId: run.id,
issueCommentStatus: "not_applicable",
updatedAt: now,
})
.returning()
.then((rows) => rows[0]);
await tx
.update(agentWakeupRequests)
.set({
runId: queuedRun.id,
updatedAt: now,
})
.where(eq(agentWakeupRequests.id, wakeupRequest.id));
await tx
.update(issues)
.set({
executionRunId: queuedRun.id,
executionAgentNameKey: normalizeAgentNameKey(agent.name),
executionLockedAt: now,
updatedAt: now,
})
.where(eq(issues.id, issue.id));
await tx
.update(heartbeatRuns)
.set({
issueCommentStatus: "retry_queued",
issueCommentRetryQueuedAt: now,
updatedAt: now,
})
.where(eq(heartbeatRuns.id, run.id));
return queuedRun;
});
if (!retryRun) return null;
publishLiveEvent({
companyId: retryRun.companyId,
type: "heartbeat.run.queued",
payload: {
runId: retryRun.id,
agentId: retryRun.agentId,
invocationSource: retryRun.invocationSource,
triggerDetail: retryRun.triggerDetail,
wakeupRequestId: retryRun.wakeupRequestId,
},
});
return retryRun;
}
async function finalizeIssueCommentPolicy(
run: typeof heartbeatRuns.$inferSelect,
agent: typeof agents.$inferSelect,
) {
const contextSnapshot = parseObject(run.contextSnapshot);
const issueId = readNonEmptyString(contextSnapshot.issueId);
if (!issueId) {
if (run.issueCommentStatus !== "not_applicable") {
await patchRunIssueCommentStatus(run.id, {
issueCommentStatus: "not_applicable",
issueCommentSatisfiedByCommentId: null,
issueCommentRetryQueuedAt: null,
});
}
return { outcome: "not_applicable" as const, queuedRun: null };
}
const postedComment = await findRunIssueComment(run.id, run.companyId, issueId);
if (postedComment) {
await patchRunIssueCommentStatus(run.id, {
issueCommentStatus: "satisfied",
issueCommentSatisfiedByCommentId: postedComment.id,
issueCommentRetryQueuedAt: null,
});
return { outcome: "satisfied" as const, queuedRun: null };
}
if (readNonEmptyString(contextSnapshot.retryReason) === "missing_issue_comment") {
await patchRunIssueCommentStatus(run.id, {
issueCommentStatus: "retry_exhausted",
issueCommentSatisfiedByCommentId: null,
});
await appendRunEvent(run, await nextRunEventSeq(run.id), {
eventType: "lifecycle",
stream: "system",
level: "warn",
message: "Run ended without an issue comment after one retry; no further comment wake will be queued",
});
return { outcome: "retry_exhausted" as const, queuedRun: null };
}
const queuedRun = await enqueueMissingIssueCommentRetry(run, agent, issueId);
if (queuedRun) {
await appendRunEvent(run, await nextRunEventSeq(run.id), {
eventType: "lifecycle",
stream: "system",
level: "warn",
message: "Run ended without an issue comment; queued one follow-up wake to require a comment",
});
return { outcome: "retry_queued" as const, queuedRun };
}
await patchRunIssueCommentStatus(run.id, {
issueCommentStatus: "retry_exhausted",
issueCommentSatisfiedByCommentId: null,
});
return { outcome: "retry_exhausted" as const, queuedRun: null };
}
async function enqueueProcessLossRetry(
run: typeof heartbeatRuns.$inferSelect,
agent: typeof agents.$inferSelect,
@ -3085,7 +3289,7 @@ export function heartbeatService(db: Db) {
try {
const issueComment = buildHeartbeatRunIssueComment(adapterResult.resultJson ?? null);
if (issueComment) {
await issuesSvc.addComment(issueId, issueComment, { agentId: agent.id });
await issuesSvc.addComment(issueId, issueComment, { agentId: agent.id, runId: finalizedRun.id });
}
} catch (err) {
await onLog(
@ -3094,6 +3298,7 @@ export function heartbeatService(db: Db) {
);
}
}
await finalizeIssueCommentPolicy(finalizedRun, agent);
await releaseIssueExecutionAndPromote(finalizedRun);
}
@ -3160,6 +3365,7 @@ export function heartbeatService(db: Db) {
level: "error",
message,
});
await finalizeIssueCommentPolicy(failedRun, agent);
await releaseIssueExecutionAndPromote(failedRun);
await updateRuntimeState(agent, failedRun, {
@ -3211,6 +3417,10 @@ export function heartbeatService(db: Db) {
level: "error",
message,
}).catch(() => undefined);
const failedAgent = await getAgent(run.agentId).catch(() => null);
if (failedAgent) {
await finalizeIssueCommentPolicy(failedRun, failedAgent).catch(() => undefined);
}
await releaseIssueExecutionAndPromote(failedRun).catch(() => undefined);
}
// Ensure the agent is not left stuck in "running" if the inner catch handler's

View file

@ -0,0 +1,347 @@
import { randomUUID } from "node:crypto";
import type { IssueExecutionDecision, IssueExecutionPolicy, IssueExecutionStage, IssueExecutionStagePrincipal, IssueExecutionState } from "@paperclipai/shared";
import { issueExecutionPolicySchema, issueExecutionStateSchema } from "@paperclipai/shared";
import { unprocessable } from "../errors.js";
type AssigneeLike = {
assigneeAgentId?: string | null;
assigneeUserId?: string | null;
};
type IssueLike = AssigneeLike & {
status: string;
executionPolicy?: IssueExecutionPolicy | Record<string, unknown> | null;
executionState?: IssueExecutionState | Record<string, unknown> | null;
};
type ActorLike = {
agentId?: string | null;
userId?: string | null;
};
type RequestedAssigneePatch = {
assigneeAgentId?: string | null;
assigneeUserId?: string | null;
};
type TransitionInput = {
issue: IssueLike;
policy: IssueExecutionPolicy | null;
requestedStatus?: string;
requestedAssigneePatch: RequestedAssigneePatch;
actor: ActorLike;
commentBody?: string | null;
};
type TransitionResult = {
patch: Record<string, unknown>;
decision?: Pick<IssueExecutionDecision, "stageId" | "stageType" | "outcome" | "body">;
};
const COMPLETED_STATUS: IssueExecutionState["status"] = "completed";
const IDLE_STATUS: IssueExecutionState["status"] = "idle";
const PENDING_STATUS: IssueExecutionState["status"] = "pending";
const CHANGES_REQUESTED_STATUS: IssueExecutionState["status"] = "changes_requested";
export function normalizeIssueExecutionPolicy(input: unknown): IssueExecutionPolicy | null {
if (input == null) return null;
const parsed = issueExecutionPolicySchema.safeParse(input);
if (!parsed.success) {
throw unprocessable("Invalid execution policy", parsed.error.flatten());
}
const stages = parsed.data.stages
.map((stage) => {
const participants: IssueExecutionStage["participants"] = stage.participants
.map((participant) => ({
id: participant.id ?? randomUUID(),
type: participant.type,
agentId: participant.type === "agent" ? participant.agentId ?? null : null,
userId: participant.type === "user" ? participant.userId ?? null : null,
}))
.filter((participant) => (participant.type === "agent" ? Boolean(participant.agentId) : Boolean(participant.userId)));
const dedupedParticipants: IssueExecutionStage["participants"] = [];
const seen = new Set<string>();
for (const participant of participants) {
const key = participant.type === "agent" ? `agent:${participant.agentId}` : `user:${participant.userId}`;
if (seen.has(key)) continue;
seen.add(key);
dedupedParticipants.push(participant);
}
if (dedupedParticipants.length === 0) return null;
return {
id: stage.id ?? randomUUID(),
type: stage.type,
approvalsNeeded: 1,
participants: dedupedParticipants,
};
})
.filter((stage): stage is NonNullable<typeof stage> => stage !== null);
if (stages.length === 0) return null;
return {
mode: parsed.data.mode ?? "normal",
commentRequired: true,
stages,
};
}
export function parseIssueExecutionState(input: unknown): IssueExecutionState | null {
if (input == null) return null;
const parsed = issueExecutionStateSchema.safeParse(input);
if (!parsed.success) return null;
return parsed.data;
}
export function assigneePrincipal(input: AssigneeLike): IssueExecutionStagePrincipal | null {
if (input.assigneeAgentId) {
return { type: "agent", agentId: input.assigneeAgentId, userId: null };
}
if (input.assigneeUserId) {
return { type: "user", userId: input.assigneeUserId, agentId: null };
}
return null;
}
function actorPrincipal(actor: ActorLike): IssueExecutionStagePrincipal | null {
if (actor.agentId) return { type: "agent", agentId: actor.agentId, userId: null };
if (actor.userId) return { type: "user", userId: actor.userId, agentId: null };
return null;
}
function principalsEqual(a: IssueExecutionStagePrincipal | null, b: IssueExecutionStagePrincipal | null): boolean {
if (!a || !b) return false;
if (a.type !== b.type) return false;
return a.type === "agent" ? a.agentId === b.agentId : a.userId === b.userId;
}
function findStageById(policy: IssueExecutionPolicy, stageId: string | null | undefined) {
if (!stageId) return null;
return policy.stages.find((stage) => stage.id === stageId) ?? null;
}
function nextPendingStage(policy: IssueExecutionPolicy, state: IssueExecutionState | null) {
const completed = new Set(state?.completedStageIds ?? []);
return policy.stages.find((stage) => !completed.has(stage.id)) ?? null;
}
function selectStageParticipant(
stage: IssueExecutionStage,
opts?: {
preferred?: IssueExecutionStagePrincipal | null;
exclude?: IssueExecutionStagePrincipal | null;
},
): IssueExecutionStagePrincipal | null {
const participants = stage.participants.filter((participant) => !principalsEqual(participant, opts?.exclude ?? null));
if (participants.length === 0) return null;
if (opts?.preferred) {
const preferred = participants.find((participant) => principalsEqual(participant, opts.preferred ?? null));
if (preferred) return preferred;
}
const first = participants[0];
return first ? { type: first.type, agentId: first.agentId ?? null, userId: first.userId ?? null } : null;
}
function patchForPrincipal(principal: IssueExecutionStagePrincipal | null) {
if (!principal) {
return { assigneeAgentId: null, assigneeUserId: null };
}
return principal.type === "agent"
? { assigneeAgentId: principal.agentId ?? null, assigneeUserId: null }
: { assigneeAgentId: null, assigneeUserId: principal.userId ?? null };
}
function buildCompletedState(previous: IssueExecutionState | null, currentStage: IssueExecutionStage): IssueExecutionState {
const completedStageIds = Array.from(new Set([...(previous?.completedStageIds ?? []), currentStage.id]));
return {
status: COMPLETED_STATUS,
currentStageId: null,
currentStageIndex: null,
currentStageType: null,
currentParticipant: null,
returnAssignee: previous?.returnAssignee ?? null,
completedStageIds,
lastDecisionId: previous?.lastDecisionId ?? null,
lastDecisionOutcome: "approved",
};
}
function buildPendingState(input: {
previous: IssueExecutionState | null;
stage: IssueExecutionStage;
stageIndex: number;
participant: IssueExecutionStagePrincipal;
returnAssignee: IssueExecutionStagePrincipal | null;
}): IssueExecutionState {
return {
status: PENDING_STATUS,
currentStageId: input.stage.id,
currentStageIndex: input.stageIndex,
currentStageType: input.stage.type,
currentParticipant: input.participant,
returnAssignee: input.returnAssignee,
completedStageIds: input.previous?.completedStageIds ?? [],
lastDecisionId: input.previous?.lastDecisionId ?? null,
lastDecisionOutcome: input.previous?.lastDecisionOutcome ?? null,
};
}
function buildChangesRequestedState(previous: IssueExecutionState, currentStage: IssueExecutionStage): IssueExecutionState {
return {
...previous,
status: CHANGES_REQUESTED_STATUS,
currentStageId: currentStage.id,
currentStageType: currentStage.type,
lastDecisionOutcome: "changes_requested",
};
}
export function applyIssueExecutionPolicyTransition(input: TransitionInput): TransitionResult {
const patch: Record<string, unknown> = {};
const existingState = parseIssueExecutionState(input.issue.executionState);
const currentAssignee = assigneePrincipal(input.issue);
const actor = actorPrincipal(input.actor);
const explicitAssignee = assigneePrincipal(input.requestedAssigneePatch);
const currentStage = input.policy ? findStageById(input.policy, existingState?.currentStageId) : null;
const requestedStatus = input.requestedStatus;
if (!input.policy) {
if (existingState) {
patch.executionState = null;
if (input.issue.status === "in_review" && existingState.returnAssignee) {
patch.status = "in_progress";
Object.assign(patch, patchForPrincipal(existingState.returnAssignee));
}
}
return { patch };
}
if (
(input.issue.status === "done" || input.issue.status === "cancelled") &&
requestedStatus &&
requestedStatus !== "done" &&
requestedStatus !== "cancelled"
) {
patch.executionState = null;
return { patch };
}
if (currentStage && input.issue.status === "in_review") {
if (!principalsEqual(existingState?.currentParticipant ?? null, actor)) {
if (requestedStatus && requestedStatus !== "in_review") {
throw unprocessable("Only the active reviewer or approver can advance the current execution stage");
}
return { patch };
}
if (requestedStatus === "done") {
if (!input.commentBody?.trim()) {
throw unprocessable("Approving a review or approval stage requires a comment");
}
const approvedState = buildCompletedState(existingState, currentStage);
const nextStage = nextPendingStage(
input.policy,
{ ...approvedState, completedStageIds: approvedState.completedStageIds },
);
if (!nextStage) {
patch.executionState = approvedState;
return {
patch,
decision: {
stageId: currentStage.id,
stageType: currentStage.type,
outcome: "approved",
body: input.commentBody.trim(),
},
};
}
const participant = selectStageParticipant(nextStage, {
preferred: explicitAssignee,
exclude: existingState?.returnAssignee ?? null,
});
if (!participant) {
throw unprocessable(`No eligible ${nextStage.type} participant is configured for this issue`);
}
patch.status = "in_review";
Object.assign(patch, patchForPrincipal(participant));
patch.executionState = buildPendingState({
previous: approvedState,
stage: nextStage,
stageIndex: input.policy.stages.findIndex((stage) => stage.id === nextStage.id),
participant,
returnAssignee: existingState?.returnAssignee ?? currentAssignee,
});
return {
patch,
decision: {
stageId: currentStage.id,
stageType: currentStage.type,
outcome: "approved",
body: input.commentBody.trim(),
},
};
}
if (requestedStatus && requestedStatus !== "in_review") {
if (!input.commentBody?.trim()) {
throw unprocessable("Requesting changes requires a comment");
}
if (!existingState?.returnAssignee) {
throw unprocessable("This execution stage has no return assignee");
}
patch.status = "in_progress";
Object.assign(patch, patchForPrincipal(existingState.returnAssignee));
patch.executionState = buildChangesRequestedState(existingState, currentStage);
return {
patch,
decision: {
stageId: currentStage.id,
stageType: currentStage.type,
outcome: "changes_requested",
body: input.commentBody.trim(),
},
};
}
return { patch };
}
if (requestedStatus !== "done") {
return { patch };
}
const pendingStage =
existingState?.status === CHANGES_REQUESTED_STATUS && currentStage
? currentStage
: nextPendingStage(input.policy, existingState);
if (!pendingStage) return { patch };
const returnAssignee = existingState?.returnAssignee ?? currentAssignee;
const participant = selectStageParticipant(pendingStage, {
preferred:
existingState?.status === CHANGES_REQUESTED_STATUS
? explicitAssignee ?? existingState.currentParticipant ?? null
: explicitAssignee,
exclude: returnAssignee,
});
if (!participant) {
throw unprocessable(`No eligible ${pendingStage.type} participant is configured for this issue`);
}
patch.status = "in_review";
Object.assign(patch, patchForPrincipal(participant));
patch.executionState = buildPendingState({
previous: existingState,
stage: pendingStage,
stageIndex: input.policy.stages.findIndex((stage) => stage.id === pendingStage.id),
participant,
returnAssignee,
});
return { patch };
}