diff --git a/server/src/__tests__/routines-routes.test.ts b/server/src/__tests__/routines-routes.test.ts index 3c7bada1..1e8806a8 100644 --- a/server/src/__tests__/routines-routes.test.ts +++ b/server/src/__tests__/routines-routes.test.ts @@ -281,6 +281,29 @@ describe("routine routes", () => { expect(mockRoutineService.runRoutine).not.toHaveBeenCalled(); }); + it("passes the board actor through when manually running a routine", async () => { + mockAccessService.canUser.mockResolvedValue(true); + const app = await createApp({ + type: "board", + userId: "board-user", + source: "session", + isInstanceAdmin: false, + companyIds: [companyId], + }); + + const res = await request(app) + .post(`/api/routines/${routineId}/run`) + .send({}); + + expect(res.status).toBe(202); + expect(mockRoutineService.runRoutine).toHaveBeenCalledWith(routineId, { + source: "manual", + }, { + agentId: null, + userId: "board-user", + }); + }); + it("allows routine creation when the board user has tasks:assign", async () => { mockAccessService.canUser.mockResolvedValue(true); const app = await createApp({ diff --git a/server/src/__tests__/routines-service.test.ts b/server/src/__tests__/routines-service.test.ts index 50b0e4e8..e59dce51 100644 --- a/server/src/__tests__/routines-service.test.ts +++ b/server/src/__tests__/routines-service.test.ts @@ -11,6 +11,8 @@ import { executionWorkspaces, heartbeatRuns, instanceSettings, + issueInboxArchives, + issueReadStates, issues, projectWorkspaces, projects, @@ -46,6 +48,8 @@ describeEmbeddedPostgres("routine service live-execution coalescing", () => { afterEach(async () => { await db.delete(activityLog); + await db.delete(issueInboxArchives); + await db.delete(issueReadStates); await db.delete(routineRuns); await db.delete(routineTriggers); await db.delete(routines); @@ -269,6 +273,36 @@ describeEmbeddedPostgres("routine service live-execution coalescing", () => { ]); }); + it("records the manual board runner on fresh routine issues so they appear in that user's inbox", async () => { + const { companyId, agentId, issueSvc, routine, svc } = await seedFixture(); + const userId = randomUUID(); + + const run = await svc.runRoutine(routine.id, { source: "manual" }, { userId }); + + expect(run.status).toBe("issue_created"); + expect(run.linkedIssueId).toBeTruthy(); + const [createdIssue] = await db + .select({ + id: issues.id, + assigneeAgentId: issues.assigneeAgentId, + createdByUserId: issues.createdByUserId, + }) + .from(issues) + .where(eq(issues.id, run.linkedIssueId!)); + expect(createdIssue).toMatchObject({ + id: run.linkedIssueId, + assigneeAgentId: agentId, + createdByUserId: userId, + }); + + const inboxIssues = await issueSvc.list(companyId, { + touchedByUserId: userId, + inboxArchivedByUserId: userId, + includeRoutineExecutions: true, + }); + expect(inboxIssues.map((issue) => issue.id)).toContain(run.linkedIssueId); + }); + it("waits for the assignee wakeup to be queued before returning the routine run", async () => { let wakeupResolved = false; const { routine, svc } = await seedFixture({ @@ -349,6 +383,166 @@ describeEmbeddedPostgres("routine service live-execution coalescing", () => { expect(routineIssues[0]?.id).toBe(previousIssue.id); }); + it("touches a coalesced routine issue for the manual runner's inbox", async () => { + const { agentId, companyId, issueSvc, routine, svc } = await seedFixture(); + const userId = randomUUID(); + const previousRunId = randomUUID(); + const liveHeartbeatRunId = randomUUID(); + const previousIssue = await issueSvc.create(companyId, { + projectId: routine.projectId, + title: routine.title, + description: routine.description, + status: "in_progress", + priority: routine.priority, + assigneeAgentId: routine.assigneeAgentId, + originKind: "routine_execution", + originId: routine.id, + originRunId: previousRunId, + }); + + await db.insert(routineRuns).values({ + id: previousRunId, + companyId, + routineId: routine.id, + triggerId: null, + source: "manual", + status: "issue_created", + triggeredAt: new Date("2026-03-20T12:00:00.000Z"), + linkedIssueId: previousIssue.id, + }); + await db.insert(heartbeatRuns).values({ + id: liveHeartbeatRunId, + companyId, + agentId, + invocationSource: "assignment", + triggerDetail: "system", + status: "running", + contextSnapshot: { issueId: previousIssue.id }, + startedAt: new Date("2026-03-20T12:01:00.000Z"), + }); + await db + .update(issues) + .set({ + checkoutRunId: liveHeartbeatRunId, + executionRunId: liveHeartbeatRunId, + executionLockedAt: new Date("2026-03-20T12:01:00.000Z"), + }) + .where(eq(issues.id, previousIssue.id)); + await db.insert(issueInboxArchives).values({ + companyId, + issueId: previousIssue.id, + userId, + archivedAt: new Date("2026-03-20T12:02:00.000Z"), + }); + + const run = await svc.runRoutine(routine.id, { source: "manual" }, { userId }); + + expect(run.status).toBe("coalesced"); + expect(run.linkedIssueId).toBe(previousIssue.id); + await expect( + db.select().from(issueInboxArchives).where(eq(issueInboxArchives.issueId, previousIssue.id)), + ).resolves.toHaveLength(0); + await expect( + db.select().from(issueReadStates).where(eq(issueReadStates.issueId, previousIssue.id)), + ).resolves.toEqual([ + expect.objectContaining({ + companyId, + issueId: previousIssue.id, + userId, + }), + ]); + + const inboxIssues = await issueSvc.list(companyId, { + touchedByUserId: userId, + inboxArchivedByUserId: userId, + includeRoutineExecutions: true, + }); + expect(inboxIssues.map((issue) => issue.id)).toContain(previousIssue.id); + }); + + it("touches a skipped active routine issue for the manual runner's inbox", async () => { + const { agentId, companyId, issueSvc, routine, svc } = await seedFixture(); + const userId = randomUUID(); + const previousRunId = randomUUID(); + const liveHeartbeatRunId = randomUUID(); + + await db + .update(routines) + .set({ concurrencyPolicy: "skip_if_active" }) + .where(eq(routines.id, routine.id)); + + const previousIssue = await issueSvc.create(companyId, { + projectId: routine.projectId, + title: routine.title, + description: routine.description, + status: "in_progress", + priority: routine.priority, + assigneeAgentId: routine.assigneeAgentId, + originKind: "routine_execution", + originId: routine.id, + originRunId: previousRunId, + }); + + await db.insert(routineRuns).values({ + id: previousRunId, + companyId, + routineId: routine.id, + triggerId: null, + source: "manual", + status: "issue_created", + triggeredAt: new Date("2026-03-20T12:00:00.000Z"), + linkedIssueId: previousIssue.id, + }); + await db.insert(heartbeatRuns).values({ + id: liveHeartbeatRunId, + companyId, + agentId, + invocationSource: "assignment", + triggerDetail: "system", + status: "running", + contextSnapshot: { issueId: previousIssue.id }, + startedAt: new Date("2026-03-20T12:01:00.000Z"), + }); + await db + .update(issues) + .set({ + checkoutRunId: liveHeartbeatRunId, + executionRunId: liveHeartbeatRunId, + executionLockedAt: new Date("2026-03-20T12:01:00.000Z"), + }) + .where(eq(issues.id, previousIssue.id)); + await db.insert(issueInboxArchives).values({ + companyId, + issueId: previousIssue.id, + userId, + archivedAt: new Date("2026-03-20T12:02:00.000Z"), + }); + + const run = await svc.runRoutine(routine.id, { source: "manual" }, { userId }); + + expect(run.status).toBe("skipped"); + expect(run.linkedIssueId).toBe(previousIssue.id); + await expect( + db.select().from(issueInboxArchives).where(eq(issueInboxArchives.issueId, previousIssue.id)), + ).resolves.toHaveLength(0); + await expect( + db.select().from(issueReadStates).where(eq(issueReadStates.issueId, previousIssue.id)), + ).resolves.toEqual([ + expect.objectContaining({ + companyId, + issueId: previousIssue.id, + userId, + }), + ]); + + const inboxIssues = await issueSvc.list(companyId, { + touchedByUserId: userId, + inboxArchivedByUserId: userId, + includeRoutineExecutions: true, + }); + expect(inboxIssues.map((issue) => issue.id)).toContain(previousIssue.id); + }); + it("does not coalesce live routine runs with different resolved variables", async () => { const { companyId, agentId, projectId, svc } = await seedFixture(); const variableRoutine = await svc.create( diff --git a/server/src/routes/routines.ts b/server/src/routes/routines.ts index bbddd9f0..0d78c066 100644 --- a/server/src/routes/routines.ts +++ b/server/src/routes/routines.ts @@ -283,7 +283,10 @@ export function routineRoutes( return; } await assertBoardCanAssignTasks(req, routine.companyId); - const run = await svc.runRoutine(routine.id, req.body); + const run = await svc.runRoutine(routine.id, req.body, { + agentId: req.actor.type === "agent" ? req.actor.agentId : null, + userId: req.actor.type === "board" ? req.actor.userId ?? null : null, + }); const actor = getActorInfo(req); await logActivity(db, { companyId: routine.companyId, diff --git a/server/src/services/routines.ts b/server/src/services/routines.ts index eee9ebf0..f12e275c 100644 --- a/server/src/services/routines.ts +++ b/server/src/services/routines.ts @@ -7,6 +7,8 @@ import { executionWorkspaces, goals, heartbeatRuns, + issueInboxArchives, + issueReadStates, issues, projects, routineRuns, @@ -758,6 +760,43 @@ export function routineService( return value; } + async function touchIssueForUserInbox( + executor: Db, + input: { + companyId: string; + issueId: string; + userId: string; + touchedAt: Date; + }, + ) { + await executor + .insert(issueReadStates) + .values({ + companyId: input.companyId, + issueId: input.issueId, + userId: input.userId, + lastReadAt: input.touchedAt, + updatedAt: input.touchedAt, + }) + .onConflictDoUpdate({ + target: [issueReadStates.companyId, issueReadStates.issueId, issueReadStates.userId], + set: { + lastReadAt: input.touchedAt, + updatedAt: input.touchedAt, + }, + }); + + await executor + .delete(issueInboxArchives) + .where( + and( + eq(issueInboxArchives.companyId, input.companyId), + eq(issueInboxArchives.issueId, input.issueId), + eq(issueInboxArchives.userId, input.userId), + ), + ); + } + async function dispatchRoutineRun(input: { routine: typeof routines.$inferSelect; trigger: typeof routineTriggers.$inferSelect | null; @@ -770,6 +809,7 @@ export function routineService( executionWorkspaceId?: string | null; executionWorkspacePreference?: string | null; executionWorkspaceSettings?: Record | null; + actor?: Actor; }) { const projectId = input.projectId ?? input.routine.projectId ?? null; const assigneeAgentId = input.assigneeAgentId ?? input.routine.assigneeAgentId ?? null; @@ -840,6 +880,7 @@ export function routineService( } const triggeredAt = new Date(); + const manualRunnerUserId = input.source === "manual" ? input.actor?.userId ?? null : null; const [createdRun] = await txDb .insert(routineRuns) .values({ @@ -864,6 +905,14 @@ export function routineService( const activeIssue = await findLiveExecutionIssue(input.routine, txDb, dispatchFingerprint); if (activeIssue && input.routine.concurrencyPolicy !== "always_enqueue") { const status = input.routine.concurrencyPolicy === "skip_if_active" ? "skipped" : "coalesced"; + if (manualRunnerUserId) { + await touchIssueForUserInbox(txDb, { + companyId: input.routine.companyId, + issueId: activeIssue.id, + userId: manualRunnerUserId, + touchedAt: triggeredAt, + }); + } const updated = await finalizeRun(createdRun.id, { status, linkedIssueId: activeIssue.id, @@ -891,6 +940,8 @@ export function routineService( status: "todo", priority: input.routine.priority, assigneeAgentId, + createdByAgentId: input.source === "manual" ? input.actor?.agentId ?? null : null, + createdByUserId: manualRunnerUserId, originKind: "routine_execution", originId: input.routine.id, originRunId: createdRun.id, @@ -914,6 +965,14 @@ export function routineService( const existingIssue = await findLiveExecutionIssue(input.routine, txDb, dispatchFingerprint); if (!existingIssue) throw error; const status = input.routine.concurrencyPolicy === "skip_if_active" ? "skipped" : "coalesced"; + if (manualRunnerUserId) { + await touchIssueForUserInbox(txDb, { + companyId: input.routine.companyId, + issueId: existingIssue.id, + userId: manualRunnerUserId, + touchedAt: triggeredAt, + }); + } const updated = await finalizeRun(createdRun.id, { status, linkedIssueId: existingIssue.id, @@ -1383,7 +1442,7 @@ export function routineService( }; }, - runRoutine: async (id: string, input: RunRoutine) => { + runRoutine: async (id: string, input: RunRoutine, actor?: Actor) => { const routine = await getRoutineById(id); if (!routine) throw notFound("Routine not found"); if (routine.status === "archived") throw conflict("Routine is archived"); @@ -1405,6 +1464,7 @@ export function routineService( executionWorkspacePreference: input.executionWorkspacePreference ?? null, executionWorkspaceSettings: (input.executionWorkspaceSettings as Record | null | undefined) ?? null, + actor, }); },