mirror of
https://github.com/alkimake/paperclip.git
synced 2026-06-19 04:00:38 +09:00
Dispatch assigned todo work during recovery sweeps (#4614)
## Thinking Path > - Paperclip orchestrates AI agents for autonomous companies. > - Agent assignments must reliably turn into heartbeat work without board operators manually nudging stuck tasks. > - The stranded-assignment recovery sweep already handles failed or lost runs. > - But assigned `todo` issues with no prior run could sit idle because there was nothing to retry or recover. > - This pull request dispatches those never-started assigned todos as normal assignment wakes. > - The benefit is that recovery fixes missed initial dispatches without creating unnecessary recovery issues. ## What Changed - Added an initial assigned-todo dispatch path to the recovery service when an assigned `todo` issue has no heartbeat run yet. - Reused invocation budget hard-stop checks before dispatching or requeueing recovery work. - Counted `assignmentDispatched` in startup/scheduled recovery logs. - Added heartbeat recovery regressions for first dispatch, duplicate queued wake prevention, budget-blocked skips, and paused-agent skips. ## Verification - `pnpm exec vitest run server/src/__tests__/heartbeat-process-recovery.test.ts` ## Risks - Low to medium risk: this changes liveness recovery behavior for assigned `todo` issues, but it stays on the existing assignment wake path and skips paused or budget-blocked agents. - No migrations. > For core feature work, check [`ROADMAP.md`](ROADMAP.md) first and discuss it in `#dev` before opening the PR. Feature PRs that overlap with planned core work may need to be redirected — check the roadmap first. See `CONTRIBUTING.md`. ## Model Used - OpenAI Codex coding agent based on GPT-5, tool-enabled local repository and shell access, Paperclip heartbeat context. ## Checklist - [x] I have included a thinking path that traces from project context to this change - [x] I have specified the model used (with version and capability details) - [x] I have checked ROADMAP.md and confirmed this PR does not duplicate planned core work - [x] I have run tests locally and they pass - [x] I have added or updated tests where applicable - [x] If this change affects the UI, I have included before/after screenshots - [x] I have updated relevant documentation to reflect my changes - [x] I have considered and documented any risks above - [x] I will address all Greptile and reviewer comments before requesting merge
This commit is contained in:
parent
7a9b3a6037
commit
68c37660f0
3 changed files with 331 additions and 1 deletions
|
|
@ -7,8 +7,10 @@ import {
|
||||||
agents,
|
agents,
|
||||||
agentRuntimeState,
|
agentRuntimeState,
|
||||||
agentWakeupRequests,
|
agentWakeupRequests,
|
||||||
|
budgetPolicies,
|
||||||
companySkills,
|
companySkills,
|
||||||
companies,
|
companies,
|
||||||
|
costEvents,
|
||||||
createDb,
|
createDb,
|
||||||
documentRevisions,
|
documentRevisions,
|
||||||
documents,
|
documents,
|
||||||
|
|
@ -306,6 +308,7 @@ describeEmbeddedPostgres("heartbeat orphaned process recovery", () => {
|
||||||
await db.delete(activityLog);
|
await db.delete(activityLog);
|
||||||
await db.delete(agentRuntimeState);
|
await db.delete(agentRuntimeState);
|
||||||
await db.delete(companySkills);
|
await db.delete(companySkills);
|
||||||
|
await db.delete(costEvents);
|
||||||
await db.delete(issueComments);
|
await db.delete(issueComments);
|
||||||
await db.delete(issueDocuments);
|
await db.delete(issueDocuments);
|
||||||
await db.delete(documentRevisions);
|
await db.delete(documentRevisions);
|
||||||
|
|
@ -336,6 +339,7 @@ describeEmbeddedPostgres("heartbeat orphaned process recovery", () => {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
await db.delete(agentWakeupRequests);
|
await db.delete(agentWakeupRequests);
|
||||||
|
await db.delete(budgetPolicies);
|
||||||
for (let attempt = 0; attempt < 5; attempt += 1) {
|
for (let attempt = 0; attempt < 5; attempt += 1) {
|
||||||
await db.delete(agentRuntimeState);
|
await db.delete(agentRuntimeState);
|
||||||
try {
|
try {
|
||||||
|
|
@ -586,6 +590,48 @@ describeEmbeddedPostgres("heartbeat orphaned process recovery", () => {
|
||||||
return { companyId, agentId, runId, wakeupRequestId, issueId, rootIssueId };
|
return { companyId, agentId, runId, wakeupRequestId, issueId, rootIssueId };
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async function seedAssignedTodoNoRunFixture(input?: {
|
||||||
|
agentStatus?: "paused" | "idle" | "running";
|
||||||
|
}) {
|
||||||
|
const companyId = randomUUID();
|
||||||
|
const agentId = randomUUID();
|
||||||
|
const issueId = randomUUID();
|
||||||
|
const issuePrefix = `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`;
|
||||||
|
|
||||||
|
await db.insert(companies).values({
|
||||||
|
id: companyId,
|
||||||
|
name: "Paperclip",
|
||||||
|
issuePrefix,
|
||||||
|
requireBoardApprovalForNewAgents: false,
|
||||||
|
});
|
||||||
|
|
||||||
|
await db.insert(agents).values({
|
||||||
|
id: agentId,
|
||||||
|
companyId,
|
||||||
|
name: "CodexCoder",
|
||||||
|
role: "engineer",
|
||||||
|
status: input?.agentStatus ?? "idle",
|
||||||
|
adapterType: "codex_local",
|
||||||
|
adapterConfig: {},
|
||||||
|
runtimeConfig: {},
|
||||||
|
permissions: {},
|
||||||
|
});
|
||||||
|
|
||||||
|
await db.insert(issues).values({
|
||||||
|
id: issueId,
|
||||||
|
companyId,
|
||||||
|
title: "Assigned todo work that never received a heartbeat",
|
||||||
|
status: "todo",
|
||||||
|
priority: "medium",
|
||||||
|
assigneeAgentId: agentId,
|
||||||
|
assigneeUserId: null,
|
||||||
|
issueNumber: 1,
|
||||||
|
identifier: `${issuePrefix}-1`,
|
||||||
|
});
|
||||||
|
|
||||||
|
return { companyId, agentId, issueId };
|
||||||
|
}
|
||||||
|
|
||||||
async function expectStrandedRecoveryArtifacts(input: {
|
async function expectStrandedRecoveryArtifacts(input: {
|
||||||
companyId: string;
|
companyId: string;
|
||||||
agentId: string;
|
agentId: string;
|
||||||
|
|
@ -1173,6 +1219,176 @@ describeEmbeddedPostgres("heartbeat orphaned process recovery", () => {
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("dispatches assigned todo work with no prior run as a normal assignment wake", async () => {
|
||||||
|
const { companyId, agentId, issueId } = await seedAssignedTodoNoRunFixture();
|
||||||
|
const heartbeat = heartbeatService(db);
|
||||||
|
|
||||||
|
const result = await heartbeat.reconcileStrandedAssignedIssues();
|
||||||
|
expect(result.assignmentDispatched).toBe(1);
|
||||||
|
expect(result.dispatchRequeued).toBe(0);
|
||||||
|
expect(result.continuationRequeued).toBe(0);
|
||||||
|
expect(result.escalated).toBe(0);
|
||||||
|
expect(result.issueIds).toEqual([issueId]);
|
||||||
|
|
||||||
|
const wakeups = await db.select().from(agentWakeupRequests).where(eq(agentWakeupRequests.agentId, agentId));
|
||||||
|
expect(wakeups).toHaveLength(1);
|
||||||
|
expect(wakeups[0]).toMatchObject({
|
||||||
|
companyId,
|
||||||
|
agentId,
|
||||||
|
source: "assignment",
|
||||||
|
triggerDetail: "system",
|
||||||
|
reason: "issue_assigned",
|
||||||
|
payload: expect.objectContaining({
|
||||||
|
issueId,
|
||||||
|
mutation: "assigned_todo_liveness_dispatch",
|
||||||
|
}),
|
||||||
|
});
|
||||||
|
|
||||||
|
const runs = await db.select().from(heartbeatRuns).where(eq(heartbeatRuns.agentId, agentId));
|
||||||
|
expect(runs).toHaveLength(1);
|
||||||
|
expect(runs[0]?.retryOfRunId).toBeNull();
|
||||||
|
expect(runs[0]?.contextSnapshot).toMatchObject({
|
||||||
|
issueId,
|
||||||
|
taskId: issueId,
|
||||||
|
wakeReason: "issue_assigned",
|
||||||
|
source: "issue.assigned_todo_liveness_dispatch",
|
||||||
|
});
|
||||||
|
expect((runs[0]?.contextSnapshot as Record<string, unknown>)?.retryReason).toBeUndefined();
|
||||||
|
|
||||||
|
const issue = await db.select().from(issues).where(eq(issues.id, issueId)).then((rows) => rows[0] ?? null);
|
||||||
|
expect(issue?.status).toBe("todo");
|
||||||
|
|
||||||
|
const recoveryIssues = await db
|
||||||
|
.select()
|
||||||
|
.from(issues)
|
||||||
|
.where(and(eq(issues.companyId, companyId), eq(issues.originKind, "stranded_issue_recovery")));
|
||||||
|
expect(recoveryIssues).toHaveLength(0);
|
||||||
|
await expect(sourceBlockerIssueIds(companyId, issueId)).resolves.toEqual([]);
|
||||||
|
|
||||||
|
const comments = await db.select().from(issueComments).where(eq(issueComments.issueId, issueId));
|
||||||
|
expect(comments).toHaveLength(0);
|
||||||
|
|
||||||
|
if (runs[0]?.id) {
|
||||||
|
await waitForRunToSettle(heartbeat, runs[0].id);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it("does not duplicate initial assigned todo dispatch when a queued wake already exists", async () => {
|
||||||
|
const { companyId, agentId, issueId } = await seedAssignedTodoNoRunFixture();
|
||||||
|
await db.insert(agentWakeupRequests).values({
|
||||||
|
companyId,
|
||||||
|
agentId,
|
||||||
|
source: "assignment",
|
||||||
|
triggerDetail: "system",
|
||||||
|
reason: "issue_assigned",
|
||||||
|
payload: { issueId, mutation: "assigned_todo_liveness_dispatch" },
|
||||||
|
status: "queued",
|
||||||
|
});
|
||||||
|
const heartbeat = heartbeatService(db);
|
||||||
|
|
||||||
|
const result = await heartbeat.reconcileStrandedAssignedIssues();
|
||||||
|
expect(result.assignmentDispatched).toBe(0);
|
||||||
|
expect(result.dispatchRequeued).toBe(0);
|
||||||
|
expect(result.continuationRequeued).toBe(0);
|
||||||
|
expect(result.escalated).toBe(0);
|
||||||
|
expect(result.skipped).toBe(1);
|
||||||
|
expect(result.issueIds).toEqual([]);
|
||||||
|
|
||||||
|
const wakeups = await db.select().from(agentWakeupRequests).where(eq(agentWakeupRequests.agentId, agentId));
|
||||||
|
expect(wakeups).toHaveLength(1);
|
||||||
|
const runs = await db.select().from(heartbeatRuns).where(eq(heartbeatRuns.agentId, agentId));
|
||||||
|
expect(runs).toHaveLength(0);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("skips budget-blocked assigned todo work with no prior run and continues the sweep", async () => {
|
||||||
|
const blocked = await seedAssignedTodoNoRunFixture();
|
||||||
|
const unblocked = await seedAssignedTodoNoRunFixture();
|
||||||
|
await db.insert(budgetPolicies).values({
|
||||||
|
companyId: blocked.companyId,
|
||||||
|
scopeType: "agent",
|
||||||
|
scopeId: blocked.agentId,
|
||||||
|
metric: "billed_cents",
|
||||||
|
windowKind: "calendar_month_utc",
|
||||||
|
amount: 1,
|
||||||
|
hardStopEnabled: true,
|
||||||
|
isActive: true,
|
||||||
|
});
|
||||||
|
await db.insert(costEvents).values({
|
||||||
|
companyId: blocked.companyId,
|
||||||
|
agentId: blocked.agentId,
|
||||||
|
issueId: blocked.issueId,
|
||||||
|
provider: "test",
|
||||||
|
biller: "test",
|
||||||
|
billingType: "tokens",
|
||||||
|
model: "test-model",
|
||||||
|
costCents: 1,
|
||||||
|
occurredAt: new Date(),
|
||||||
|
});
|
||||||
|
const heartbeat = heartbeatService(db);
|
||||||
|
|
||||||
|
const result = await heartbeat.reconcileStrandedAssignedIssues();
|
||||||
|
expect(result.assignmentDispatched).toBe(1);
|
||||||
|
expect(result.dispatchRequeued).toBe(0);
|
||||||
|
expect(result.continuationRequeued).toBe(0);
|
||||||
|
expect(result.escalated).toBe(0);
|
||||||
|
expect(result.skipped).toBe(1);
|
||||||
|
expect(result.issueIds).toEqual([unblocked.issueId]);
|
||||||
|
|
||||||
|
const blockedWakeups = await db
|
||||||
|
.select()
|
||||||
|
.from(agentWakeupRequests)
|
||||||
|
.where(eq(agentWakeupRequests.agentId, blocked.agentId));
|
||||||
|
expect(blockedWakeups).toHaveLength(0);
|
||||||
|
const blockedRuns = await db.select().from(heartbeatRuns).where(eq(heartbeatRuns.agentId, blocked.agentId));
|
||||||
|
expect(blockedRuns).toHaveLength(0);
|
||||||
|
|
||||||
|
const blockedIssue = await db
|
||||||
|
.select()
|
||||||
|
.from(issues)
|
||||||
|
.where(eq(issues.id, blocked.issueId))
|
||||||
|
.then((rows) => rows[0] ?? null);
|
||||||
|
expect(blockedIssue?.status).toBe("todo");
|
||||||
|
|
||||||
|
const unblockedWakeups = await db
|
||||||
|
.select()
|
||||||
|
.from(agentWakeupRequests)
|
||||||
|
.where(eq(agentWakeupRequests.agentId, unblocked.agentId));
|
||||||
|
expect(unblockedWakeups).toHaveLength(1);
|
||||||
|
expect(unblockedWakeups[0]).toMatchObject({
|
||||||
|
reason: "issue_assigned",
|
||||||
|
payload: expect.objectContaining({
|
||||||
|
issueId: unblocked.issueId,
|
||||||
|
mutation: "assigned_todo_liveness_dispatch",
|
||||||
|
}),
|
||||||
|
});
|
||||||
|
const unblockedRuns = await db
|
||||||
|
.select()
|
||||||
|
.from(heartbeatRuns)
|
||||||
|
.where(eq(heartbeatRuns.agentId, unblocked.agentId));
|
||||||
|
expect(unblockedRuns).toHaveLength(1);
|
||||||
|
if (unblockedRuns[0]?.id) {
|
||||||
|
await waitForRunToSettle(heartbeat, unblockedRuns[0].id);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it("does not dispatch assigned todo work with no prior run when the agent is paused", async () => {
|
||||||
|
const { agentId, issueId } = await seedAssignedTodoNoRunFixture({ agentStatus: "paused" });
|
||||||
|
const heartbeat = heartbeatService(db);
|
||||||
|
|
||||||
|
const result = await heartbeat.reconcileStrandedAssignedIssues();
|
||||||
|
expect(result.assignmentDispatched).toBe(0);
|
||||||
|
expect(result.dispatchRequeued).toBe(0);
|
||||||
|
expect(result.continuationRequeued).toBe(0);
|
||||||
|
expect(result.escalated).toBe(0);
|
||||||
|
expect(result.skipped).toBe(1);
|
||||||
|
expect(result.issueIds).toEqual([]);
|
||||||
|
|
||||||
|
const issue = await db.select().from(issues).where(eq(issues.id, issueId)).then((rows) => rows[0] ?? null);
|
||||||
|
expect(issue?.status).toBe("todo");
|
||||||
|
const runs = await db.select().from(heartbeatRuns).where(eq(heartbeatRuns.agentId, agentId));
|
||||||
|
expect(runs).toHaveLength(0);
|
||||||
|
});
|
||||||
|
|
||||||
it("re-enqueues assigned todo work when the last issue run died and no wake remains", async () => {
|
it("re-enqueues assigned todo work when the last issue run died and no wake remains", async () => {
|
||||||
const { agentId, issueId, runId } = await seedStrandedIssueFixture({
|
const { agentId, issueId, runId } = await seedStrandedIssueFixture({
|
||||||
status: "todo",
|
status: "todo",
|
||||||
|
|
@ -1181,6 +1397,7 @@ describeEmbeddedPostgres("heartbeat orphaned process recovery", () => {
|
||||||
const heartbeat = heartbeatService(db);
|
const heartbeat = heartbeatService(db);
|
||||||
|
|
||||||
const result = await heartbeat.reconcileStrandedAssignedIssues();
|
const result = await heartbeat.reconcileStrandedAssignedIssues();
|
||||||
|
expect(result.assignmentDispatched).toBe(0);
|
||||||
expect(result.dispatchRequeued).toBe(1);
|
expect(result.dispatchRequeued).toBe(1);
|
||||||
expect(result.continuationRequeued).toBe(0);
|
expect(result.continuationRequeued).toBe(0);
|
||||||
expect(result.escalated).toBe(0);
|
expect(result.escalated).toBe(0);
|
||||||
|
|
@ -1200,6 +1417,42 @@ describeEmbeddedPostgres("heartbeat orphaned process recovery", () => {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("still re-enqueues stranded assigned todo recovery when an old queued wake exists", async () => {
|
||||||
|
const { companyId, agentId, issueId, runId } = await seedStrandedIssueFixture({
|
||||||
|
status: "todo",
|
||||||
|
runStatus: "failed",
|
||||||
|
});
|
||||||
|
await db.insert(agentWakeupRequests).values({
|
||||||
|
companyId,
|
||||||
|
agentId,
|
||||||
|
source: "assignment",
|
||||||
|
triggerDetail: "system",
|
||||||
|
reason: "issue_assigned",
|
||||||
|
payload: { issueId },
|
||||||
|
status: "queued",
|
||||||
|
});
|
||||||
|
const heartbeat = heartbeatService(db);
|
||||||
|
|
||||||
|
const result = await heartbeat.reconcileStrandedAssignedIssues();
|
||||||
|
expect(result.assignmentDispatched).toBe(0);
|
||||||
|
expect(result.dispatchRequeued).toBe(1);
|
||||||
|
expect(result.continuationRequeued).toBe(0);
|
||||||
|
expect(result.escalated).toBe(0);
|
||||||
|
expect(result.issueIds).toEqual([issueId]);
|
||||||
|
|
||||||
|
const runs = await db
|
||||||
|
.select()
|
||||||
|
.from(heartbeatRuns)
|
||||||
|
.where(eq(heartbeatRuns.agentId, agentId));
|
||||||
|
expect(runs).toHaveLength(2);
|
||||||
|
|
||||||
|
const retryRun = runs.find((row) => row.id !== runId);
|
||||||
|
expect((retryRun?.contextSnapshot as Record<string, unknown>)?.retryReason).toBe("assignment_recovery");
|
||||||
|
if (retryRun) {
|
||||||
|
await waitForRunToSettle(heartbeat, retryRun.id);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
it("blocks assigned todo work after the one automatic dispatch recovery was already used", async () => {
|
it("blocks assigned todo work after the one automatic dispatch recovery was already used", async () => {
|
||||||
const { companyId, agentId, issueId, runId } = await seedStrandedIssueFixture({
|
const { companyId, agentId, issueId, runId } = await seedStrandedIssueFixture({
|
||||||
status: "todo",
|
status: "todo",
|
||||||
|
|
|
||||||
|
|
@ -682,6 +682,7 @@ export async function startServer(): Promise<StartedServer> {
|
||||||
const reconciled = await heartbeat.reconcileStrandedAssignedIssues();
|
const reconciled = await heartbeat.reconcileStrandedAssignedIssues();
|
||||||
if (
|
if (
|
||||||
promotion.promoted > 0 ||
|
promotion.promoted > 0 ||
|
||||||
|
reconciled.assignmentDispatched > 0 ||
|
||||||
reconciled.dispatchRequeued > 0 ||
|
reconciled.dispatchRequeued > 0 ||
|
||||||
reconciled.continuationRequeued > 0 ||
|
reconciled.continuationRequeued > 0 ||
|
||||||
reconciled.escalated > 0
|
reconciled.escalated > 0
|
||||||
|
|
@ -740,6 +741,7 @@ export async function startServer(): Promise<StartedServer> {
|
||||||
const reconciled = await heartbeat.reconcileStrandedAssignedIssues();
|
const reconciled = await heartbeat.reconcileStrandedAssignedIssues();
|
||||||
if (
|
if (
|
||||||
promotion.promoted > 0 ||
|
promotion.promoted > 0 ||
|
||||||
|
reconciled.assignmentDispatched > 0 ||
|
||||||
reconciled.dispatchRequeued > 0 ||
|
reconciled.dispatchRequeued > 0 ||
|
||||||
reconciled.continuationRequeued > 0 ||
|
reconciled.continuationRequeued > 0 ||
|
||||||
reconciled.escalated > 0
|
reconciled.escalated > 0
|
||||||
|
|
|
||||||
|
|
@ -343,6 +343,21 @@ export function recoveryService(db: Db, deps: { enqueueWakeup: RecoveryWakeup })
|
||||||
return Boolean(run || deferredWake);
|
return Boolean(run || deferredWake);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async function hasQueuedIssueWake(companyId: string, issueId: string) {
|
||||||
|
return db
|
||||||
|
.select({ id: agentWakeupRequests.id })
|
||||||
|
.from(agentWakeupRequests)
|
||||||
|
.where(
|
||||||
|
and(
|
||||||
|
eq(agentWakeupRequests.companyId, companyId),
|
||||||
|
eq(agentWakeupRequests.status, "queued"),
|
||||||
|
sql`${agentWakeupRequests.payload} ->> 'issueId' = ${issueId}`,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
.limit(1)
|
||||||
|
.then((rows) => Boolean(rows[0]));
|
||||||
|
}
|
||||||
|
|
||||||
async function enqueueStrandedIssueRecovery(input: {
|
async function enqueueStrandedIssueRecovery(input: {
|
||||||
issueId: string;
|
issueId: string;
|
||||||
agentId: string;
|
agentId: string;
|
||||||
|
|
@ -386,6 +401,34 @@ export function recoveryService(db: Db, deps: { enqueueWakeup: RecoveryWakeup })
|
||||||
return queued;
|
return queued;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async function enqueueInitialAssignedTodoDispatch(issue: typeof issues.$inferSelect, agentId: string) {
|
||||||
|
return deps.enqueueWakeup(agentId, {
|
||||||
|
source: "assignment",
|
||||||
|
triggerDetail: "system",
|
||||||
|
reason: "issue_assigned",
|
||||||
|
payload: {
|
||||||
|
issueId: issue.id,
|
||||||
|
mutation: "assigned_todo_liveness_dispatch",
|
||||||
|
},
|
||||||
|
requestedByActorType: "system",
|
||||||
|
requestedByActorId: null,
|
||||||
|
contextSnapshot: {
|
||||||
|
issueId: issue.id,
|
||||||
|
taskId: issue.id,
|
||||||
|
wakeReason: "issue_assigned",
|
||||||
|
source: "issue.assigned_todo_liveness_dispatch",
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
async function isInvocationBudgetBlocked(issue: typeof issues.$inferSelect, agentId: string) {
|
||||||
|
const budgetBlock = await budgets.getInvocationBlock(issue.companyId, agentId, {
|
||||||
|
issueId: issue.id,
|
||||||
|
projectId: issue.projectId,
|
||||||
|
});
|
||||||
|
return Boolean(budgetBlock);
|
||||||
|
}
|
||||||
|
|
||||||
async function reconcileUnassignedBlockingIssues() {
|
async function reconcileUnassignedBlockingIssues() {
|
||||||
const candidates = await db
|
const candidates = await db
|
||||||
.select({
|
.select({
|
||||||
|
|
@ -1526,6 +1569,7 @@ export function recoveryService(db: Db, deps: { enqueueWakeup: RecoveryWakeup })
|
||||||
);
|
);
|
||||||
|
|
||||||
const result = {
|
const result = {
|
||||||
|
assignmentDispatched: 0,
|
||||||
dispatchRequeued: 0,
|
dispatchRequeued: 0,
|
||||||
continuationRequeued: 0,
|
continuationRequeued: 0,
|
||||||
orphanBlockersAssigned: 0,
|
orphanBlockersAssigned: 0,
|
||||||
|
|
@ -1574,7 +1618,28 @@ export function recoveryService(db: Db, deps: { enqueueWakeup: RecoveryWakeup })
|
||||||
}
|
}
|
||||||
|
|
||||||
if (issue.status === "todo") {
|
if (issue.status === "todo") {
|
||||||
if (!latestRun || latestRun.status === "succeeded") {
|
if (!latestRun) {
|
||||||
|
if (await hasQueuedIssueWake(issue.companyId, issue.id)) {
|
||||||
|
result.skipped += 1;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (await isInvocationBudgetBlocked(issue, agentId)) {
|
||||||
|
result.skipped += 1;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
const queued = await enqueueInitialAssignedTodoDispatch(issue, agentId);
|
||||||
|
if (queued) {
|
||||||
|
result.assignmentDispatched += 1;
|
||||||
|
result.issueIds.push(issue.id);
|
||||||
|
} else {
|
||||||
|
result.skipped += 1;
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (latestRun.status === "succeeded") {
|
||||||
result.skipped += 1;
|
result.skipped += 1;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
@ -1599,6 +1664,11 @@ export function recoveryService(db: Db, deps: { enqueueWakeup: RecoveryWakeup })
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (await isInvocationBudgetBlocked(issue, agentId)) {
|
||||||
|
result.skipped += 1;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
const queued = await enqueueStrandedIssueRecovery({
|
const queued = await enqueueStrandedIssueRecovery({
|
||||||
issueId: issue.id,
|
issueId: issue.id,
|
||||||
agentId,
|
agentId,
|
||||||
|
|
@ -1640,6 +1710,11 @@ export function recoveryService(db: Db, deps: { enqueueWakeup: RecoveryWakeup })
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (await isInvocationBudgetBlocked(issue, agentId)) {
|
||||||
|
result.skipped += 1;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
const queued = await enqueueStrandedIssueRecovery({
|
const queued = await enqueueStrandedIssueRecovery({
|
||||||
issueId: issue.id,
|
issueId: issue.id,
|
||||||
agentId,
|
agentId,
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue