[codex] Split backend control-plane QoL slice (#4700)

## Thinking Path

> - Paperclip is the control plane for autonomous AI companies, so
backend task ownership, recovery, review visibility, and company-scoped
limits need to stay enforceable without UI-only coupling.
> - Closed PR #4692 bundled those backend changes with UI workflow,
docs, skills, workflow, and lockfile churn.
> - PAP-2694 asks for a clean backend/control-plane slice from that
closed branch.
> - This branch starts from current `master` and mines only the `cli`,
`packages/db`, `packages/shared`, and `server` contracts/tests needed
for the backend behavior.
> - It explicitly excludes UI workflow/performance work,
`.github/workflows/pr.yml`, `pnpm-lock.yaml`, docs, skills,
package-script, adapter UI build-config, and perf fixture script
changes; the only UI files are fixture/test updates required by the
tightened shared `Company` contract.
> - The benefit is a smaller reviewable PR that preserves the
control-plane fixes while staying under Greptile s 100-file review
limit.

## What Changed

- Added company-scoped attachment-size limits through DB
schema/migrations, shared company portability contracts, CLI
import/export coverage, and server attachment upload enforcement.
- Added productivity review service/API behavior for no-comment streak,
long-active, and high-churn review issues, including request-depth
clamping and issue summary exposure.
- Hardened issue ownership and recovery/control-plane paths: peer-agent
mutation denial, issue tree pause/resume behavior, stranded recovery
origins, and related activity/test coverage.
- Preserved related backend contract updates for routine timestamp
variables and managed agent instruction bundles because they live in
shared/server contracts from the source branch.
- Addressed Greptile feedback by making `Company.attachmentMaxBytes`
non-optional, simplifying review request-depth clamping, fixing the
migration final newline, and enforcing the process-level attachment cap
as the final ceiling for uploads.
- Added minimal company fixtures needed for repo-wide typecheck/build
and kept the PR to 66 changed files with forbidden/non-slice paths
excluded.

## Verification

- `pnpm install --frozen-lockfile`
- `git diff --check origin/master..HEAD`
- `git diff --name-only origin/master..HEAD | wc -l` -> 66 files
- `git diff --name-only origin/master..HEAD -- .github/workflows/pr.yml
pnpm-lock.yaml package.json doc skills .agents scripts
packages/adapters` -> no output
- `pnpm exec vitest run --config vitest.config.ts
packages/shared/src/validators/issue.test.ts
packages/shared/src/routine-variables.test.ts
packages/shared/src/adapter-types.test.ts
cli/src/__tests__/company-import-export-e2e.test.ts
cli/src/__tests__/company.test.ts
server/src/__tests__/productivity-review-service.test.ts
server/src/__tests__/issue-tree-control-service.test.ts
server/src/__tests__/issue-tree-control-routes.test.ts
server/src/__tests__/issue-agent-mutation-ownership-routes.test.ts
server/src/__tests__/issue-attachment-routes.test.ts
server/src/__tests__/heartbeat-process-recovery.test.ts
server/src/__tests__/issues-service.test.ts` -> 12 files, 147 tests
passed
- `pnpm exec vitest run --config vitest.config.ts
cli/src/__tests__/company-delete.test.ts
cli/src/__tests__/company-import-export-e2e.test.ts
server/src/__tests__/productivity-review-service.test.ts` -> 3 files, 18
tests passed
- `pnpm exec vitest run --config vitest.config.ts
server/src/__tests__/issue-attachment-routes.test.ts` -> 1 file, 6 tests
passed
- `pnpm --filter @paperclipai/db typecheck && pnpm --filter
@paperclipai/shared typecheck && pnpm --filter @paperclipai/server
typecheck && pnpm --filter paperclipai typecheck`
- `pnpm --filter @paperclipai/server typecheck`
- `pnpm --filter @paperclipai/ui typecheck && pnpm --filter
@paperclipai/ui build`

## Risks

- Includes migrations `0073_shiny_salo.sql` and
`0074_striped_genesis.sql`; merge ordering matters if another PR adds
migrations first.
- This is intentionally backend-only apart from fixture/test updates
forced by shared type correctness; UI affordances from PR #4692 are not
present here and should land in separate UI slices.
- The worktree install emitted plugin SDK bin-link warnings for unbuilt
plugin packages, but the targeted tests and package typechecks completed
successfully.

> For core feature work, check [`ROADMAP.md`](ROADMAP.md) first and
discuss it in `#dev` before opening the PR. Feature PRs that overlap
with planned core work may need to be redirected; check the roadmap
first. See `CONTRIBUTING.md`.

## Model Used

- OpenAI Codex, GPT-5 coding agent, tool-enabled terminal/GitHub
workflow. Exact runtime context window was not exposed by the harness.

## Checklist

- [x] I have included a thinking path that traces from project context
to this change
- [x] I have specified the model used (with version and capability
details)
- [x] I have checked ROADMAP.md and confirmed this PR does not duplicate
planned core work
- [x] I have run tests locally and they pass
- [x] I have added or updated tests where applicable
- [x] If this change affects the UI, I have included before/after
screenshots
- [x] I have updated relevant documentation to reflect my changes
- [x] I have considered and documented any risks above
- [x] I will address all Greptile and reviewer comments before
requesting merge

---------

Co-authored-by: Paperclip <noreply@paperclip.ing>
This commit is contained in:
Dotta 2026-04-28 16:46:45 -05:00 committed by GitHub
parent d9f540c331
commit 1991ec9d6f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
66 changed files with 34186 additions and 148 deletions

View file

@ -45,6 +45,7 @@ export function companyService(db: Db) {
issueCounter: companies.issueCounter,
budgetMonthlyCents: companies.budgetMonthlyCents,
spentMonthlyCents: companies.spentMonthlyCents,
attachmentMaxBytes: companies.attachmentMaxBytes,
requireBoardApprovalForNewAgents: companies.requireBoardApprovalForNewAgents,
feedbackDataSharingEnabled: companies.feedbackDataSharingEnabled,
feedbackDataSharingConsentAt: companies.feedbackDataSharingConsentAt,

View file

@ -2428,6 +2428,10 @@ function buildManifestFromPackageFiles(
description: asString(companyFrontmatter.description),
brandColor: asString(paperclipCompany.brandColor),
logoPath: asString(paperclipCompany.logoPath) ?? asString(paperclipCompany.logo),
attachmentMaxBytes:
typeof paperclipCompany.attachmentMaxBytes === "number" && Number.isFinite(paperclipCompany.attachmentMaxBytes)
? Math.max(1, Math.floor(paperclipCompany.attachmentMaxBytes))
: null,
requireBoardApprovalForNewAgents:
typeof paperclipCompany.requireBoardApprovalForNewAgents === "boolean"
? paperclipCompany.requireBoardApprovalForNewAgents
@ -3465,6 +3469,7 @@ export function companyPortabilityService(db: Db, storage?: StorageService) {
company: stripEmptyValues({
brandColor: company.brandColor ?? null,
logoPath: companyLogoPath,
attachmentMaxBytes: company.attachmentMaxBytes,
requireBoardApprovalForNewAgents: company.requireBoardApprovalForNewAgents ? true : undefined,
feedbackDataSharingEnabled: company.feedbackDataSharingEnabled ? true : undefined,
feedbackDataSharingConsentAt: company.feedbackDataSharingConsentAt?.toISOString() ?? null,
@ -3963,6 +3968,7 @@ export function companyPortabilityService(db: Db, storage?: StorageService) {
id: string;
name: string;
requireBoardApprovalForNewAgents?: boolean | null;
attachmentMaxBytes?: number | null;
} | null = null;
let companyAction: "created" | "updated" | "unchanged" = "unchanged";
@ -3985,6 +3991,9 @@ export function companyPortabilityService(db: Db, storage?: StorageService) {
name: companyName,
description: include.company ? (sourceManifest.company?.description ?? null) : null,
brandColor: include.company ? (sourceManifest.company?.brandColor ?? null) : null,
attachmentMaxBytes: include.company
? (sourceManifest.company?.attachmentMaxBytes ?? undefined)
: undefined,
requireBoardApprovalForNewAgents: include.company
? (sourceManifest.company?.requireBoardApprovalForNewAgents ?? false)
: false,
@ -4016,6 +4025,7 @@ export function companyPortabilityService(db: Db, storage?: StorageService) {
name: sourceManifest.company.name,
description: sourceManifest.company.description,
brandColor: sourceManifest.company.brandColor,
attachmentMaxBytes: sourceManifest.company.attachmentMaxBytes ?? undefined,
requireBoardApprovalForNewAgents: sourceManifest.company.requireBoardApprovalForNewAgents,
feedbackDataSharingEnabled: sourceManifest.company.feedbackDataSharingEnabled,
feedbackDataSharingConsentAt: sourceManifest.company.feedbackDataSharingConsentAt

View file

@ -110,6 +110,7 @@ import {
} from "./recovery/index.js";
import { isAutomaticRecoverySuppressedByPauseHold } from "./recovery/pause-hold-guard.js";
import { recoveryService } from "./recovery/service.js";
import { productivityReviewService } from "./productivity-review.js";
import { withAgentStartLock } from "./agent-start-lock.js";
import { redactCurrentUserText, redactCurrentUserValue } from "../log-redaction.js";
import {
@ -2004,6 +2005,7 @@ export function heartbeatService(db: Db, options: HeartbeatServiceOptions = {})
};
const budgets = budgetService(db, budgetHooks);
const recovery = recoveryService(db, { enqueueWakeup });
const productivityReviews = productivityReviewService(db, { enqueueWakeup });
let unsafeTextProjectionPromise: Promise<boolean> | null = null;
async function hasUnsafeTextProjectionDatabase() {
@ -2807,6 +2809,29 @@ export function heartbeatService(db: Db, options: HeartbeatServiceOptions = {})
projectId: issue.projectId,
})
: null;
if (issue) {
const productivityHold = await productivityReviews.isProductivityReviewContinuationHoldActive({
companyId: issue.companyId,
issueId: issue.id,
agentId: run.agentId,
});
if (productivityHold.held) {
await setRunStatus(run.id, run.status, {
livenessReason:
`${run.livenessReason ?? "Run ended without concrete progress"}; continuation held by productivity review ${productivityHold.reviewIdentifier ?? productivityHold.reviewIssueId}`,
});
await productivityReviews.recordContinuationHold({
companyId: issue.companyId,
issueId: issue.id,
runId: run.id,
agentId: run.agentId,
reviewIssueId: productivityHold.reviewIssueId,
trigger: productivityHold.trigger,
reason: productivityHold.reason,
});
return;
}
}
const nextAttempt = readContinuationAttempt(run.continuationAttempt) + 1;
const idempotencyKey = issue
@ -4494,6 +4519,10 @@ export function heartbeatService(db: Db, options: HeartbeatServiceOptions = {})
return recovery.scanSilentActiveRuns(opts);
}
async function reconcileProductivityReviews(opts?: { now?: Date; companyId?: string }) {
return productivityReviews.reconcileProductivityReviews(opts);
}
async function buildRunOutputSilence(
run: Pick<
typeof heartbeatRuns.$inferSelect,
@ -7494,6 +7523,8 @@ export function heartbeatService(db: Db, options: HeartbeatServiceOptions = {})
scanSilentActiveRuns,
reconcileProductivityReviews,
buildRunOutputSilence,
tickTimers: async (now = new Date()) => {

View file

@ -32,6 +32,10 @@ export { routineService } from "./routines.js";
export { costService } from "./costs.js";
export { financeService } from "./finance.js";
export { heartbeatService } from "./heartbeat.js";
export {
productivityReviewService,
PRODUCTIVITY_REVIEW_ORIGIN_KIND,
} from "./productivity-review.js";
export { classifyIssueGraphLiveness, type IssueLivenessFinding } from "./recovery/index.js";
export { dashboardService } from "./dashboard.js";
export { sidebarBadgeService } from "./sidebar-badges.js";

View file

@ -71,7 +71,7 @@ type RestoreTreeStatusResult = TreeStatusUpdateResult & {
const TERMINAL_ISSUE_STATUSES = new Set<IssueStatus>(["done", "cancelled"]);
const ACTIVE_RUN_STATUSES = ["queued", "running"] as const;
const DEFAULT_RELEASE_POLICY: IssueTreeHoldReleasePolicy = { strategy: "manual" };
const MAX_PAUSE_HOLD_GATE_DEPTH = 15;
const MAX_PAUSE_HOLD_ANCESTOR_DEPTH = 100;
export const ISSUE_TREE_CONTROL_INTERACTION_WAKE_REASONS: ReadonlySet<string> = new Set([
"issue_commented",
"issue_reopened_via_comment",
@ -548,6 +548,22 @@ export function issueTreeControlService(db: Db) {
return byIssueId;
}
async function activePauseHoldsForIssueIds(companyId: string, issueIds: string[]) {
if (issueIds.length === 0) return [];
return db
.select()
.from(issueTreeHolds)
.where(
and(
eq(issueTreeHolds.companyId, companyId),
eq(issueTreeHolds.status, "active"),
eq(issueTreeHolds.mode, "pause"),
inArray(issueTreeHolds.rootIssueId, issueIds),
),
)
.orderBy(asc(issueTreeHolds.createdAt), asc(issueTreeHolds.id));
}
async function getActivePauseHoldGate(
companyId: string,
issueId: string,
@ -573,9 +589,12 @@ export function issueTreeControlService(db: Db) {
const holdByRootIssueId = new Map(activePauseHolds.map((hold) => [hold.rootIssueId, hold]));
let currentIssueId: string | null = issueId;
const visited = new Set<string>();
let depth = 0;
while (currentIssueId && !visited.has(currentIssueId) && depth < MAX_PAUSE_HOLD_GATE_DEPTH) {
while (
currentIssueId
&& !visited.has(currentIssueId)
&& visited.size < MAX_PAUSE_HOLD_ANCESTOR_DEPTH
) {
visited.add(currentIssueId);
const hold = holdByRootIssueId.get(currentIssueId);
if (hold) {
@ -596,7 +615,6 @@ export function issueTreeControlService(db: Db) {
.where(and(eq(issues.id, currentIssueId), eq(issues.companyId, companyId)))
.then((rows) => rows[0] ?? null);
currentIssueId = parent?.parentId ?? null;
depth += 1;
}
return null;
@ -690,13 +708,100 @@ export function issueTreeControlService(db: Db) {
releasePolicy?: IssueTreeHoldReleasePolicy | null;
actor: ActorInput;
},
) {
): Promise<{
hold: IssueTreeHold;
preview: IssueTreeControlPreview;
resumedPauseHoldIds?: string[];
}> {
const holdReleasePolicy = normalizeReleasePolicy(input.releasePolicy);
const holdPreview = await preview(companyId, rootIssueId, {
mode: input.mode,
releasePolicy: holdReleasePolicy,
});
if (input.mode === "resume") {
const issueIds = [...new Set(holdPreview.issues.map((issue) => issue.id))];
const activePauseHolds = await activePauseHoldsForIssueIds(companyId, issueIds);
const releaseReason = input.reason ?? "Subtree resume applied.";
const { hold: resumeHold } = await db.transaction(async (tx) => {
const [createdHold] = await tx
.insert(issueTreeHolds)
.values({
companyId,
rootIssueId,
mode: input.mode,
status: "active",
reason: input.reason ?? null,
releasePolicy: holdReleasePolicy as unknown as Record<string, unknown>,
createdByActorType: input.actor.actorType,
createdByAgentId: input.actor.agentId ?? null,
createdByUserId: input.actor.userId ?? (input.actor.actorType === "user" ? input.actor.actorId : null),
createdByRunId: input.actor.runId ?? null,
})
.returning();
const memberRows = holdPreview.issues.map((issue) => ({
companyId,
holdId: createdHold.id,
issueId: issue.id,
parentIssueId: issue.parentId,
depth: issue.depth,
issueIdentifier: issue.identifier,
issueTitle: issue.title,
issueStatus: issue.status,
assigneeAgentId: issue.assigneeAgentId,
assigneeUserId: issue.assigneeUserId,
activeRunId: issue.activeRun?.id ?? null,
activeRunStatus: issue.activeRun?.status ?? null,
skipped: issue.skipped,
skipReason: issue.skipReason,
}));
const createdMembers = memberRows.length > 0
? await tx
.insert(issueTreeHoldMembers)
.values(memberRows)
.returning()
: [];
return { hold: toHold(createdHold, createdMembers) };
});
const resumedPauseHoldIds = activePauseHolds.map((hold) => hold.id);
if (resumedPauseHoldIds.length > 0) {
await Promise.all(
activePauseHolds.map((pauseHold) =>
releaseHold(companyId, pauseHold.rootIssueId, pauseHold.id, {
reason: releaseReason,
metadata: {
resumedByResumeHoldId: resumeHold.id,
resumeHoldMode: "tree_resume",
resumedPauseHoldId: pauseHold.id,
},
actor: input.actor,
}),
),
);
}
const releasedResumeHold = await releaseHold(companyId, rootIssueId, resumeHold.id, {
reason: releaseReason,
metadata: {
resumedPauseHoldIds,
resumeMode: "subtree",
...(input.releasePolicy ? { releasePolicy: holdReleasePolicy } : {}),
},
actor: input.actor,
});
return {
hold: releasedResumeHold,
preview: holdPreview,
resumedPauseHoldIds,
};
}
const { hold, members } = await db.transaction(async (tx) => {
const [createdHold] = await tx
.insert(issueTreeHolds)

View file

@ -27,8 +27,13 @@ import {
projectWorkspaces,
projects,
} from "@paperclipai/db";
import type { IssueBlockerAttention, IssueRelationIssueSummary } from "@paperclipai/shared";
import { extractAgentMentionIds, extractProjectMentionIds, isUuidLike } from "@paperclipai/shared";
import type {
IssueBlockerAttention,
IssueProductivityReview,
IssueProductivityReviewTrigger,
IssueRelationIssueSummary,
} from "@paperclipai/shared";
import { clampIssueRequestDepth, extractAgentMentionIds, extractProjectMentionIds, isUuidLike } from "@paperclipai/shared";
import { conflict, notFound, unprocessable } from "../errors.js";
import {
defaultIssueExecutionWorkspaceSettingsForProject,
@ -107,6 +112,7 @@ export interface IssueFilters {
includeBlockedBy?: boolean;
q?: string;
limit?: number;
offset?: number;
}
type IssueRow = typeof issues.$inferSelect;
@ -666,6 +672,17 @@ const BLOCKER_ATTENTION_ACTIVE_WAKE_STATUSES = ["queued", "deferred_issue_execut
const BLOCKER_ATTENTION_PENDING_INTERACTION_STATUSES = ["pending"];
const BLOCKER_ATTENTION_PENDING_APPROVAL_STATUSES = ["pending", "revision_requested"];
const BLOCKER_ATTENTION_OPEN_RECOVERY_ORIGIN_KIND = "harness_liveness_escalation";
const PRODUCTIVITY_REVIEW_ORIGIN_KIND = "issue_productivity_review";
const PRODUCTIVITY_REVIEW_TERMINAL_STATUSES = ["done", "cancelled"];
const PRODUCTIVITY_REVIEW_ACTIVITY_ACTIONS = [
"issue.productivity_review_created",
"issue.productivity_review_updated",
];
const PRODUCTIVITY_REVIEW_TRIGGERS: readonly IssueProductivityReviewTrigger[] = [
"no_comment_streak",
"long_active_duration",
"high_churn",
];
const BLOCKER_ATTENTION_OPEN_RECOVERY_TERMINAL_STATUSES = ["done", "cancelled"];
const BLOCKER_ATTENTION_MAX_DEPTH = 8;
const BLOCKER_ATTENTION_MAX_NODES = 2000;
@ -876,6 +893,114 @@ async function terminalExplicitBlockersByRoot(
return terminalByRoot;
}
function readProductivityReviewTrigger(value: unknown): IssueProductivityReviewTrigger | null {
if (typeof value !== "string") return null;
return PRODUCTIVITY_REVIEW_TRIGGERS.includes(value as IssueProductivityReviewTrigger)
? (value as IssueProductivityReviewTrigger)
: null;
}
function readProductivityReviewStreak(value: unknown): number | null {
if (typeof value !== "number" || !Number.isFinite(value) || value < 0) return null;
return Math.floor(value);
}
async function listIssueProductivityReviewMap(
dbOrTx: any,
companyId: string,
sourceIssueIds: string[],
): Promise<Map<string, IssueProductivityReview>> {
const map = new Map<string, IssueProductivityReview>();
if (sourceIssueIds.length === 0) return map;
const reviewRows: Array<{
sourceIssueId: string | null;
reviewIssueId: string;
reviewIdentifier: string | null;
status: string;
priority: string;
createdAt: Date;
updatedAt: Date;
}> = [];
for (const chunk of chunkList([...new Set(sourceIssueIds)], ISSUE_LIST_RELATED_QUERY_CHUNK_SIZE)) {
const rows = await dbOrTx
.select({
sourceIssueId: issues.originId,
reviewIssueId: issues.id,
reviewIdentifier: issues.identifier,
status: issues.status,
priority: issues.priority,
createdAt: issues.createdAt,
updatedAt: issues.updatedAt,
})
.from(issues)
.where(
and(
eq(issues.companyId, companyId),
eq(issues.originKind, PRODUCTIVITY_REVIEW_ORIGIN_KIND),
inArray(issues.originId, chunk),
isNull(issues.hiddenAt),
notInArray(issues.status, PRODUCTIVITY_REVIEW_TERMINAL_STATUSES),
),
);
reviewRows.push(...rows);
}
if (reviewRows.length === 0) return map;
const reviewIssueIds = reviewRows.map((row) => row.reviewIssueId);
const triggerByReviewIssueId = new Map<
string,
{ trigger: IssueProductivityReviewTrigger | null; noCommentStreak: number | null }
>();
for (const chunk of chunkList(reviewIssueIds, ISSUE_LIST_RELATED_QUERY_CHUNK_SIZE)) {
const detailRows = await dbOrTx
.select({
entityId: activityLog.entityId,
details: activityLog.details,
createdAt: activityLog.createdAt,
})
.from(activityLog)
.where(
and(
eq(activityLog.companyId, companyId),
eq(activityLog.entityType, "issue"),
inArray(activityLog.entityId, chunk),
inArray(activityLog.action, PRODUCTIVITY_REVIEW_ACTIVITY_ACTIONS),
),
)
.orderBy(desc(activityLog.createdAt));
for (const row of detailRows as Array<{
entityId: string;
details: Record<string, unknown> | null;
createdAt: Date;
}>) {
if (triggerByReviewIssueId.has(row.entityId)) continue;
triggerByReviewIssueId.set(row.entityId, {
trigger: readProductivityReviewTrigger(row.details?.trigger),
noCommentStreak: readProductivityReviewStreak(row.details?.noCommentStreak),
});
}
}
for (const row of reviewRows) {
if (!row.sourceIssueId) continue;
const detail = triggerByReviewIssueId.get(row.reviewIssueId);
map.set(row.sourceIssueId, {
reviewIssueId: row.reviewIssueId,
reviewIdentifier: row.reviewIdentifier,
status: row.status as IssueProductivityReview["status"],
priority: row.priority as IssueProductivityReview["priority"],
trigger: detail?.trigger ?? null,
noCommentStreak: detail?.noCommentStreak ?? null,
createdAt: row.createdAt,
updatedAt: row.updatedAt,
});
}
return map;
}
async function listIssueBlockerAttentionMap(
dbOrTx: any,
companyId: string,
@ -1959,6 +2084,9 @@ export function issueService(db: Db) {
const limit = typeof filters?.limit === "number" && Number.isFinite(filters.limit)
? Math.max(1, Math.floor(filters.limit))
: undefined;
const offset = typeof filters?.offset === "number" && Number.isFinite(filters.offset)
? Math.max(0, Math.floor(filters.offset))
: 0;
const touchedByUserId = filters?.touchedByUserId?.trim() || undefined;
const inboxArchivedByUserId = filters?.inboxArchivedByUserId?.trim() || undefined;
const unreadForUserId = filters?.unreadForUserId?.trim() || undefined;
@ -2081,8 +2209,12 @@ export function issueService(db: Db) {
asc(priorityOrder),
desc(canonicalLastActivityAt),
desc(issues.updatedAt),
desc(issues.id),
);
const rows = (limit === undefined ? await baseQuery : await baseQuery.limit(limit)).map((row) => ({
const pageQuery = offset > 0
? (limit === undefined ? baseQuery.offset(offset) : baseQuery.limit(limit).offset(offset))
: (limit === undefined ? baseQuery : baseQuery.limit(limit));
const rows = (await pageQuery).map((row) => ({
...row,
description: decodeDatabaseTextPreview(row.description, ISSUE_LIST_DESCRIPTION_MAX_CHARS),
}));
@ -2108,7 +2240,10 @@ export function issueService(db: Db) {
]);
const statsByIssueId = new Map(statsRows.map((row) => [row.issueId, row]));
const lastActivityByIssueId = new Map(lastActivityRows.map((row) => [row.issueId, row]));
const blockerAttentionByIssueId = await listIssueBlockerAttentionMap(db, companyId, withRuns);
const [blockerAttentionByIssueId, productivityReviewByIssueId] = await Promise.all([
listIssueBlockerAttentionMap(db, companyId, withRuns),
listIssueProductivityReviewMap(db, companyId, issueIds),
]);
if (!contextUserId) {
return withRuns.map((row) => {
@ -2123,6 +2258,9 @@ export function issueService(db: Db) {
...(includeBlockedBy ? { blockedBy: blockedByMap.get(row.id) ?? [] } : {}),
lastActivityAt,
...(blockerAttentionByIssueId.has(row.id) ? { blockerAttention: blockerAttentionByIssueId.get(row.id) } : {}),
...(productivityReviewByIssueId.has(row.id)
? { productivityReview: productivityReviewByIssueId.get(row.id) }
: {}),
};
});
}
@ -2141,6 +2279,9 @@ export function issueService(db: Db) {
...(includeBlockedBy ? { blockedBy: blockedByMap.get(row.id) ?? [] } : {}),
lastActivityAt,
...(blockerAttentionByIssueId.has(row.id) ? { blockerAttention: blockerAttentionByIssueId.get(row.id) } : {}),
...(productivityReviewByIssueId.has(row.id)
? { productivityReview: productivityReviewByIssueId.get(row.id) }
: {}),
...deriveIssueUserContext(row, contextUserId, {
myLastCommentAt: statsByIssueId.get(row.id)?.myLastCommentAt ?? null,
myLastReadAt: readByIssueId.get(row.id) ?? null,
@ -2292,6 +2433,14 @@ export function issueService(db: Db) {
return listIssueBlockerAttentionMap(dbOrTx, companyId, issueRows);
},
listProductivityReviews: async (
companyId: string,
sourceIssueIds: string[],
dbOrTx: any = db,
) => {
return listIssueProductivityReviewMap(dbOrTx, companyId, sourceIssueIds);
},
listWakeableBlockedDependents: async (blockerIssueId: string) => {
const blockerIssue = await db
.select({ id: issues.id, companyId: issues.companyId })
@ -2458,7 +2607,9 @@ export function issueService(db: Db) {
parentId: parent.id,
projectId: issueData.projectId ?? parent.projectId,
goalId: issueData.goalId ?? parent.goalId,
requestDepth: Math.max(parent.requestDepth + 1, issueData.requestDepth ?? 0),
requestDepth: clampIssueRequestDepth(
Math.max(clampIssueRequestDepth(parent.requestDepth) + 1, issueData.requestDepth ?? 0),
),
description: appendAcceptanceCriteriaToDescription(issueData.description, acceptanceCriteria),
inheritExecutionWorkspaceFromIssueId: parent.id,
});
@ -2615,6 +2766,7 @@ export function issueService(db: Db) {
const values = {
...issueData,
requestDepth: clampIssueRequestDepth(issueData.requestDepth),
originKind: issueData.originKind ?? "manual",
goalId: resolveIssueGoalId({
projectId: issueData.projectId,
@ -2700,6 +2852,9 @@ export function issueService(db: Db) {
...issueData,
updatedAt: new Date(),
};
if (issueData.requestDepth !== undefined) {
patch.requestDepth = clampIssueRequestDepth(issueData.requestDepth);
}
const nextAssigneeAgentId =
issueData.assigneeAgentId !== undefined ? issueData.assigneeAgentId : existing.assigneeAgentId;

View file

@ -0,0 +1,792 @@
import { and, asc, desc, eq, gt, inArray, isNull, notInArray, sql } from "drizzle-orm";
import type { Db } from "@paperclipai/db";
import { clampIssueRequestDepth } from "@paperclipai/shared";
import {
agents,
companies,
costEvents,
heartbeatRuns,
issueComments,
issues,
projects,
} from "@paperclipai/db";
import { logger } from "../middleware/logger.js";
import { logActivity } from "./activity-log.js";
import { budgetService } from "./budgets.js";
import { issueService } from "./issues.js";
import { RECOVERY_ORIGIN_KINDS } from "./recovery/origins.js";
export const PRODUCTIVITY_REVIEW_ORIGIN_KIND = RECOVERY_ORIGIN_KINDS.issueProductivityReview;
export const DEFAULT_PRODUCTIVITY_REVIEW_NO_COMMENT_STREAK_RUNS = 10;
export const DEFAULT_PRODUCTIVITY_REVIEW_LONG_ACTIVE_HOURS = 6;
export const DEFAULT_PRODUCTIVITY_REVIEW_HIGH_CHURN_HOURLY = 10;
export const DEFAULT_PRODUCTIVITY_REVIEW_HIGH_CHURN_SIX_HOURS = 30;
export const DEFAULT_PRODUCTIVITY_REVIEW_RESOLVED_SNOOZE_MS = 6 * 60 * 60 * 1000;
const TERMINAL_RUN_STATUSES = ["succeeded", "failed", "cancelled", "timed_out"] as const;
const ACTIVE_RUN_STATUSES = ["queued", "running", "scheduled_retry"] as const;
const MAX_CANDIDATE_ISSUES = 250;
const MAX_RUNS_FOR_STREAK = 100;
const MAX_PARENT_WALK_DEPTH = 25;
type IssueRow = typeof issues.$inferSelect;
type AgentRow = typeof agents.$inferSelect;
type HeartbeatRunRow = typeof heartbeatRuns.$inferSelect;
type ProductivityReviewTrigger = "no_comment_streak" | "long_active_duration" | "high_churn";
type ProductivityReviewThresholds = {
noCommentStreakRuns: number;
longActiveMs: number;
highChurnHourly: number;
highChurnSixHours: number;
resolvedSnoozeMs: number;
};
type ProductivityReviewEvidence = {
trigger: ProductivityReviewTrigger;
triggerReasons: string[];
sourceIssue: IssueRow;
sourceAgent: AgentRow;
noCommentStreak: number;
totalRunCount: number;
terminalRunCount: number;
activeRunCount: number;
runCountLastHour: number;
runCountLastSixHours: number;
commentCount: number;
commentCountLastHour: number;
commentCountLastSixHours: number;
elapsedMs: number | null;
latestRuns: HeartbeatRunRow[];
latestComments: Array<typeof issueComments.$inferSelect>;
costCents: number;
usageSamples: Array<{ runId: string; usageJson: Record<string, unknown> | null }>;
nextAction: string | null;
thresholds: ProductivityReviewThresholds;
generatedAt: Date;
};
type EnqueueWakeup = (
agentId: string,
opts?: {
source?: "timer" | "assignment" | "on_demand" | "automation";
triggerDetail?: "manual" | "ping" | "callback" | "system";
reason?: string | null;
payload?: Record<string, unknown> | null;
requestedByActorType?: "user" | "agent" | "system";
requestedByActorId?: string | null;
contextSnapshot?: Record<string, unknown>;
},
) => Promise<unknown | null>;
function productivityReviewFingerprint(sourceIssueId: string) {
return `productivity-review:${sourceIssueId}`;
}
function issueRunScopeSql(issueId: string) {
return sql`(
${heartbeatRuns.contextSnapshot}->>'issueId' = ${issueId}
or ${heartbeatRuns.contextSnapshot}->>'taskId' = ${issueId}
or ${heartbeatRuns.contextSnapshot}->>'taskKey' = ${issueId}
)`;
}
function msToHuman(ms: number | null) {
if (ms === null) return "unknown";
const minutes = Math.floor(ms / 60_000);
if (minutes < 60) return `${minutes}m`;
const hours = Math.floor(minutes / 60);
const days = Math.floor(hours / 24);
if (days > 0) return `${days}d ${hours % 24}h`;
return `${hours}h ${minutes % 60}m`;
}
function issueUiLink(issue: { identifier: string | null; id: string }, prefix: string) {
const label = issue.identifier ?? issue.id;
return `[${label}](/${prefix}/issues/${label})`;
}
function runUiLink(run: { id: string; agentId: string }, prefix: string) {
return `[${run.id}](/${prefix}/agents/${run.agentId}/runs/${run.id})`;
}
function truncateInline(value: string | null | undefined, max = 260) {
if (!value) return "";
const compact = value.replace(/\s+/g, " ").trim();
return compact.length <= max ? compact : `${compact.slice(0, max - 3)}...`;
}
function readPositiveInteger(value: number, fallback: number) {
return Number.isFinite(value) && value > 0 ? Math.floor(value) : fallback;
}
function buildThresholds(overrides?: Partial<ProductivityReviewThresholds>): ProductivityReviewThresholds {
return {
noCommentStreakRuns: readPositiveInteger(
overrides?.noCommentStreakRuns ?? DEFAULT_PRODUCTIVITY_REVIEW_NO_COMMENT_STREAK_RUNS,
DEFAULT_PRODUCTIVITY_REVIEW_NO_COMMENT_STREAK_RUNS,
),
longActiveMs: readPositiveInteger(
overrides?.longActiveMs ?? DEFAULT_PRODUCTIVITY_REVIEW_LONG_ACTIVE_HOURS * 60 * 60 * 1000,
DEFAULT_PRODUCTIVITY_REVIEW_LONG_ACTIVE_HOURS * 60 * 60 * 1000,
),
highChurnHourly: readPositiveInteger(
overrides?.highChurnHourly ?? DEFAULT_PRODUCTIVITY_REVIEW_HIGH_CHURN_HOURLY,
DEFAULT_PRODUCTIVITY_REVIEW_HIGH_CHURN_HOURLY,
),
highChurnSixHours: readPositiveInteger(
overrides?.highChurnSixHours ?? DEFAULT_PRODUCTIVITY_REVIEW_HIGH_CHURN_SIX_HOURS,
DEFAULT_PRODUCTIVITY_REVIEW_HIGH_CHURN_SIX_HOURS,
),
resolvedSnoozeMs: readPositiveInteger(
overrides?.resolvedSnoozeMs ?? DEFAULT_PRODUCTIVITY_REVIEW_RESOLVED_SNOOZE_MS,
DEFAULT_PRODUCTIVITY_REVIEW_RESOLVED_SNOOZE_MS,
),
};
}
function choosePrimaryTrigger(input: {
noComment: boolean;
longActive: boolean;
highChurn: boolean;
}): ProductivityReviewTrigger | null {
if (input.noComment) return "no_comment_streak";
if (input.highChurn) return "high_churn";
if (input.longActive) return "long_active_duration";
return null;
}
function isSoftStopTrigger(trigger: ProductivityReviewTrigger) {
return trigger === "no_comment_streak" || trigger === "high_churn";
}
function formatTrigger(trigger: ProductivityReviewTrigger) {
if (trigger === "no_comment_streak") return "No-comment streak";
if (trigger === "high_churn") return "High churn";
return "Long active duration";
}
export function productivityReviewService(db: Db, deps?: { enqueueWakeup?: EnqueueWakeup }) {
const issuesSvc = issueService(db);
const budgets = budgetService(db);
async function getCompanyIssuePrefix(companyId: string) {
return db
.select({ issuePrefix: companies.issuePrefix })
.from(companies)
.where(eq(companies.id, companyId))
.then((rows) => rows[0]?.issuePrefix ?? "PAP");
}
async function getAgent(agentId: string) {
return db
.select()
.from(agents)
.where(eq(agents.id, agentId))
.then((rows) => rows[0] ?? null);
}
function isAgentInvokable(agent: AgentRow | null | undefined) {
return Boolean(agent && !["paused", "terminated", "pending_approval"].includes(agent.status));
}
async function isProductivityReviewDescendant(issue: Pick<IssueRow, "companyId" | "parentId">) {
let parentId = issue.parentId;
let depth = 0;
while (parentId && depth < MAX_PARENT_WALK_DEPTH) {
const parent = await db
.select({ id: issues.id, parentId: issues.parentId, originKind: issues.originKind })
.from(issues)
.where(and(eq(issues.companyId, issue.companyId), eq(issues.id, parentId)))
.then((rows) => rows[0] ?? null);
if (!parent) return false;
if (parent.originKind === PRODUCTIVITY_REVIEW_ORIGIN_KIND) return true;
parentId = parent.parentId;
depth += 1;
}
return false;
}
async function findOpenProductivityReview(companyId: string, sourceIssueId: string) {
return db
.select()
.from(issues)
.where(
and(
eq(issues.companyId, companyId),
eq(issues.originKind, PRODUCTIVITY_REVIEW_ORIGIN_KIND),
eq(issues.originId, sourceIssueId),
isNull(issues.hiddenAt),
notInArray(issues.status, ["done", "cancelled"]),
),
)
.orderBy(desc(issues.updatedAt))
.limit(1)
.then((rows) => rows[0] ?? null);
}
async function findRecentResolvedProductivityReview(
companyId: string,
sourceIssueId: string,
thresholds: ProductivityReviewThresholds,
now: Date,
) {
const cutoff = new Date(now.getTime() - thresholds.resolvedSnoozeMs);
return db
.select({ id: issues.id, identifier: issues.identifier, status: issues.status, updatedAt: issues.updatedAt })
.from(issues)
.where(
and(
eq(issues.companyId, companyId),
eq(issues.originKind, PRODUCTIVITY_REVIEW_ORIGIN_KIND),
eq(issues.originId, sourceIssueId),
eq(issues.status, "done"),
gt(issues.updatedAt, cutoff),
),
)
.orderBy(desc(issues.updatedAt))
.limit(1)
.then((rows) => rows[0] ?? null);
}
async function countIssueRunsSince(companyId: string, agentId: string, issueId: string, since: Date) {
return db
.select({ count: sql<number>`count(*)::int` })
.from(heartbeatRuns)
.where(
and(
eq(heartbeatRuns.companyId, companyId),
eq(heartbeatRuns.agentId, agentId),
issueRunScopeSql(issueId),
sql`coalesce(${heartbeatRuns.startedAt}, ${heartbeatRuns.createdAt}) >= ${since.toISOString()}::timestamptz`,
),
)
.then((rows) => rows[0]?.count ?? 0);
}
async function countIssueCommentsSince(companyId: string, issueId: string, agentId: string, since?: Date) {
return db
.select({ count: sql<number>`count(*)::int` })
.from(issueComments)
.innerJoin(heartbeatRuns, eq(heartbeatRuns.id, issueComments.createdByRunId))
.where(
and(
eq(issueComments.companyId, companyId),
eq(issueComments.issueId, issueId),
eq(issueComments.authorAgentId, agentId),
eq(heartbeatRuns.companyId, companyId),
eq(heartbeatRuns.agentId, agentId),
issueRunScopeSql(issueId),
since ? sql`${issueComments.createdAt} >= ${since.toISOString()}::timestamptz` : undefined,
),
)
.then((rows) => rows[0]?.count ?? 0);
}
async function collectEvidence(
sourceIssue: IssueRow,
sourceAgent: AgentRow,
thresholds: ProductivityReviewThresholds,
now: Date,
): Promise<ProductivityReviewEvidence | null> {
const oneHourAgo = new Date(now.getTime() - 60 * 60 * 1000);
const sixHoursAgo = new Date(now.getTime() - 6 * 60 * 60 * 1000);
const latestRuns = await db
.select()
.from(heartbeatRuns)
.where(
and(
eq(heartbeatRuns.companyId, sourceIssue.companyId),
eq(heartbeatRuns.agentId, sourceAgent.id),
issueRunScopeSql(sourceIssue.id),
),
)
.orderBy(desc(heartbeatRuns.createdAt), desc(heartbeatRuns.id))
.limit(MAX_RUNS_FOR_STREAK);
const runIds = latestRuns.map((run) => run.id);
const commentRunIds = new Set<string>();
if (runIds.length > 0) {
const commentRows = await db
.select({ createdByRunId: issueComments.createdByRunId })
.from(issueComments)
.where(
and(
eq(issueComments.companyId, sourceIssue.companyId),
eq(issueComments.issueId, sourceIssue.id),
inArray(issueComments.createdByRunId, runIds),
),
);
for (const row of commentRows) {
if (row.createdByRunId) commentRunIds.add(row.createdByRunId);
}
}
const terminalRuns = latestRuns.filter((run) =>
TERMINAL_RUN_STATUSES.includes(run.status as (typeof TERMINAL_RUN_STATUSES)[number]),
);
let noCommentStreak = 0;
for (const run of terminalRuns) {
if (commentRunIds.has(run.id)) break;
noCommentStreak += 1;
}
const [
runCountLastHour,
runCountLastSixHours,
assigneeRunCommentCount,
assigneeRunCommentCountLastHour,
assigneeRunCommentCountLastSixHours,
latestComments,
costRow,
] = await Promise.all([
countIssueRunsSince(sourceIssue.companyId, sourceAgent.id, sourceIssue.id, oneHourAgo),
countIssueRunsSince(sourceIssue.companyId, sourceAgent.id, sourceIssue.id, sixHoursAgo),
countIssueCommentsSince(sourceIssue.companyId, sourceIssue.id, sourceAgent.id),
countIssueCommentsSince(sourceIssue.companyId, sourceIssue.id, sourceAgent.id, oneHourAgo),
countIssueCommentsSince(sourceIssue.companyId, sourceIssue.id, sourceAgent.id, sixHoursAgo),
db
.select({ comment: issueComments })
.from(issueComments)
.innerJoin(heartbeatRuns, eq(heartbeatRuns.id, issueComments.createdByRunId))
.where(
and(
eq(issueComments.companyId, sourceIssue.companyId),
eq(issueComments.issueId, sourceIssue.id),
eq(issueComments.authorAgentId, sourceAgent.id),
eq(heartbeatRuns.companyId, sourceIssue.companyId),
eq(heartbeatRuns.agentId, sourceAgent.id),
issueRunScopeSql(sourceIssue.id),
),
)
.orderBy(desc(issueComments.createdAt), desc(issueComments.id))
.limit(5)
.then((rows) => rows.map((row) => row.comment)),
db
.select({ costCents: sql<number>`coalesce(sum(${costEvents.costCents}), 0)::int` })
.from(costEvents)
.where(and(eq(costEvents.companyId, sourceIssue.companyId), eq(costEvents.issueId, sourceIssue.id)))
.then((rows) => rows[0] ?? { costCents: 0 }),
]);
const activeRunCount = latestRuns.filter((run) =>
ACTIVE_RUN_STATUSES.includes(run.status as (typeof ACTIVE_RUN_STATUSES)[number]),
).length;
const activeStartedAt = sourceIssue.startedAt ?? sourceIssue.executionLockedAt ?? null;
const elapsedMs = sourceIssue.status === "in_progress" && activeStartedAt
? Math.max(0, now.getTime() - activeStartedAt.getTime())
: null;
const noComment = noCommentStreak >= thresholds.noCommentStreakRuns;
const longActive = elapsedMs !== null && elapsedMs >= thresholds.longActiveMs;
const highChurn =
runCountLastHour >= thresholds.highChurnHourly ||
assigneeRunCommentCountLastHour >= thresholds.highChurnHourly ||
runCountLastSixHours >= thresholds.highChurnSixHours ||
assigneeRunCommentCountLastSixHours >= thresholds.highChurnSixHours;
const trigger = choosePrimaryTrigger({ noComment, longActive, highChurn });
if (!trigger) return null;
const triggerReasons: string[] = [];
if (noComment) triggerReasons.push(`${noCommentStreak} consecutive completed issue-linked runs had no run-created issue comment`);
if (longActive) triggerReasons.push(`current active episode has lasted ${msToHuman(elapsedMs)}`);
if (highChurn) {
triggerReasons.push(
`${runCountLastHour} runs/${assigneeRunCommentCountLastHour} assignee-run comments in 1h; ${runCountLastSixHours} runs/${assigneeRunCommentCountLastSixHours} assignee-run comments in 6h`,
);
}
return {
trigger,
triggerReasons,
sourceIssue,
sourceAgent,
noCommentStreak,
totalRunCount: latestRuns.length,
terminalRunCount: terminalRuns.length,
activeRunCount,
runCountLastHour,
runCountLastSixHours,
commentCount: assigneeRunCommentCount,
commentCountLastHour: assigneeRunCommentCountLastHour,
commentCountLastSixHours: assigneeRunCommentCountLastSixHours,
elapsedMs,
latestRuns: latestRuns.slice(0, 5),
latestComments,
costCents: costRow.costCents,
usageSamples: latestRuns
.filter((run) => run.usageJson)
.slice(0, 3)
.map((run) => ({ runId: run.id, usageJson: run.usageJson ?? null })),
nextAction: latestRuns.find((run) => run.nextAction)?.nextAction ?? null,
thresholds,
generatedAt: now,
};
}
async function resolveReviewOwnerAgentId(sourceIssue: IssueRow, sourceAgent: AgentRow) {
const candidateIds: string[] = [];
if (sourceAgent.reportsTo) candidateIds.push(sourceAgent.reportsTo);
if (sourceIssue.createdByAgentId) candidateIds.push(sourceIssue.createdByAgentId);
if (sourceIssue.projectId) {
const project = await db
.select({ leadAgentId: projects.leadAgentId })
.from(projects)
.where(and(eq(projects.companyId, sourceIssue.companyId), eq(projects.id, sourceIssue.projectId)))
.then((rows) => rows[0] ?? null);
if (project?.leadAgentId) candidateIds.push(project.leadAgentId);
}
const roleCandidates = await db
.select({ id: agents.id })
.from(agents)
.where(and(eq(agents.companyId, sourceIssue.companyId), inArray(agents.role, ["cto", "ceo"])))
.orderBy(sql`case when ${agents.role} = 'cto' then 0 else 1 end`, asc(agents.createdAt), asc(agents.id));
candidateIds.push(...roleCandidates.map((agent) => agent.id));
const seen = new Set<string>();
for (const agentId of candidateIds) {
if (seen.has(agentId)) continue;
seen.add(agentId);
const candidate = await getAgent(agentId);
if (!candidate || candidate.companyId !== sourceIssue.companyId || !isAgentInvokable(candidate)) continue;
const budgetBlock = await budgets.getInvocationBlock(sourceIssue.companyId, candidate.id, {
issueId: sourceIssue.id,
projectId: sourceIssue.projectId ?? null,
});
if (!budgetBlock) return candidate.id;
}
return null;
}
function buildReviewMarkdown(evidence: ProductivityReviewEvidence, prefix: string) {
const latestRuns = evidence.latestRuns.length > 0
? evidence.latestRuns.map((run) =>
`- ${runUiLink(run, prefix)} \`${run.status}\` liveness \`${run.livenessState ?? "unknown"}\`, created ${run.createdAt.toISOString()}${run.nextAction ? `, next action: ${truncateInline(run.nextAction, 160)}` : ""}`,
).join("\n")
: "- none";
const latestComments = evidence.latestComments.length > 0
? evidence.latestComments.map((comment) =>
`- ${comment.createdAt.toISOString()}${comment.createdByRunId ? ` run \`${comment.createdByRunId}\`` : ""}: ${truncateInline(comment.body)}`,
).join("\n")
: "- none";
const usage = evidence.usageSamples.length > 0
? evidence.usageSamples.map((sample) => `- \`${sample.runId}\`: \`${JSON.stringify(sample.usageJson).slice(0, 500)}\``).join("\n")
: "- no usage payloads on sampled runs";
return [
"Paperclip detected an unusual productivity/progression pattern on an assigned issue.",
"",
"## Source",
"",
`- Source issue: ${issueUiLink(evidence.sourceIssue, prefix)}`,
`- Assigned agent: ${evidence.sourceAgent.name} (${evidence.sourceAgent.role})`,
`- Primary trigger: \`${evidence.trigger}\` (${formatTrigger(evidence.trigger)})`,
`- Trigger reasons: ${evidence.triggerReasons.join("; ")}`,
`- Generated at: ${evidence.generatedAt.toISOString()}`,
"",
"## Evidence",
"",
`- Total sampled issue-linked runs: ${evidence.totalRunCount}`,
`- Terminal sampled runs: ${evidence.terminalRunCount}`,
`- Active queued/running/scheduled runs: ${evidence.activeRunCount}`,
`- No-comment completed-run streak: ${evidence.noCommentStreak}`,
`- Current active elapsed time: ${msToHuman(evidence.elapsedMs)}`,
`- Runs in rolling windows: ${evidence.runCountLastHour}/1h, ${evidence.runCountLastSixHours}/6h`,
`- Assignee run-linked comments total/window: ${evidence.commentCount} total, ${evidence.commentCountLastHour}/1h, ${evidence.commentCountLastSixHours}/6h`,
`- Cost events total: ${evidence.costCents} cents`,
`- Current next action: ${evidence.nextAction ? truncateInline(evidence.nextAction, 500) : "none recorded"}`,
"",
"## Thresholds",
"",
`- No-comment streak: ${evidence.thresholds.noCommentStreakRuns} completed runs`,
`- Long active duration: ${msToHuman(evidence.thresholds.longActiveMs)}`,
`- High churn: ${evidence.thresholds.highChurnHourly}/1h or ${evidence.thresholds.highChurnSixHours}/6h runs/assignee-run comments`,
`- Resolved-review snooze: ${msToHuman(evidence.thresholds.resolvedSnoozeMs)}`,
"",
"## Latest Runs",
"",
latestRuns,
"",
"## Latest Assignee Run Comments",
"",
latestComments,
"",
"## Usage Samples",
"",
usage,
"",
"## Manager Decision",
"",
"- Close as productive if this pattern is expected.",
"- Continue with a snooze window if the current work should keep running without repeat review spam.",
"- Request decomposition, reroute, block with an unblock owner, or stop/cancel the source work if the work is inefficient.",
].join("\n");
}
function buildRefreshComment(evidence: ProductivityReviewEvidence, prefix: string) {
return [
"Productivity review evidence refreshed.",
"",
`- Source issue: ${issueUiLink(evidence.sourceIssue, prefix)}`,
`- Trigger: \`${evidence.trigger}\` (${formatTrigger(evidence.trigger)})`,
`- Reasons: ${evidence.triggerReasons.join("; ")}`,
`- No-comment streak: ${evidence.noCommentStreak}`,
`- Runs/assignee comments: ${evidence.runCountLastHour}/${evidence.commentCountLastHour} in 1h, ${evidence.runCountLastSixHours}/${evidence.commentCountLastSixHours} in 6h`,
`- Next action: ${evidence.nextAction ? truncateInline(evidence.nextAction, 300) : "none recorded"}`,
].join("\n");
}
async function createOrUpdateReview(
evidence: ProductivityReviewEvidence,
opts: { prefix: string },
) {
const existing = await findOpenProductivityReview(evidence.sourceIssue.companyId, evidence.sourceIssue.id);
if (existing) {
await issuesSvc.addComment(existing.id, buildRefreshComment(evidence, opts.prefix), {});
await logActivity(db, {
companyId: evidence.sourceIssue.companyId,
actorType: "system",
actorId: "system",
action: "issue.productivity_review_updated",
entityType: "issue",
entityId: existing.id,
agentId: existing.assigneeAgentId,
details: {
source: "productivity_review.reconcile",
sourceIssueId: evidence.sourceIssue.id,
trigger: evidence.trigger,
noCommentStreak: evidence.noCommentStreak,
runCountLastHour: evidence.runCountLastHour,
commentCountLastHour: evidence.commentCountLastHour,
},
});
return { kind: "updated" as const, reviewIssueId: existing.id };
}
const ownerAgentId = await resolveReviewOwnerAgentId(evidence.sourceIssue, evidence.sourceAgent);
let review: Awaited<ReturnType<typeof issuesSvc.create>>;
try {
review = await issuesSvc.create(evidence.sourceIssue.companyId, {
title: `Review productivity for ${evidence.sourceIssue.identifier ?? evidence.sourceIssue.title}`,
description: buildReviewMarkdown(evidence, opts.prefix),
status: "todo",
priority: evidence.trigger === "long_active_duration" ? "medium" : "high",
parentId: evidence.sourceIssue.id,
projectId: evidence.sourceIssue.projectId,
goalId: evidence.sourceIssue.goalId,
billingCode: evidence.sourceIssue.billingCode,
assigneeAgentId: ownerAgentId,
originKind: PRODUCTIVITY_REVIEW_ORIGIN_KIND,
originId: evidence.sourceIssue.id,
originFingerprint: productivityReviewFingerprint(evidence.sourceIssue.id),
requestDepth: clampIssueRequestDepth(evidence.sourceIssue.requestDepth + 1),
});
} catch (error) {
const maybe = error as { code?: string; constraint?: string; message?: string };
const uniqueConflict = maybe.code === "23505" &&
(
maybe.constraint === "issues_active_productivity_review_uq" ||
typeof maybe.message === "string" && maybe.message.includes("issues_active_productivity_review_uq")
);
if (!uniqueConflict) throw error;
const raced = await findOpenProductivityReview(evidence.sourceIssue.companyId, evidence.sourceIssue.id);
if (!raced) throw error;
return { kind: "existing" as const, reviewIssueId: raced.id };
}
await logActivity(db, {
companyId: evidence.sourceIssue.companyId,
actorType: "system",
actorId: "system",
action: "issue.productivity_review_created",
entityType: "issue",
entityId: review.id,
agentId: ownerAgentId,
details: {
source: "productivity_review.reconcile",
sourceIssueId: evidence.sourceIssue.id,
trigger: evidence.trigger,
noCommentStreak: evidence.noCommentStreak,
runCountLastHour: evidence.runCountLastHour,
commentCountLastHour: evidence.commentCountLastHour,
},
});
if (ownerAgentId && deps?.enqueueWakeup) {
await deps.enqueueWakeup(ownerAgentId, {
source: "assignment",
triggerDetail: "system",
reason: "issue_assigned",
payload: {
issueId: review.id,
sourceIssueId: evidence.sourceIssue.id,
trigger: evidence.trigger,
},
requestedByActorType: "system",
requestedByActorId: "productivity_review",
contextSnapshot: {
issueId: review.id,
taskId: review.id,
wakeReason: "issue_assigned",
source: PRODUCTIVITY_REVIEW_ORIGIN_KIND,
sourceIssueId: evidence.sourceIssue.id,
productivityReviewTrigger: evidence.trigger,
},
});
}
return { kind: "created" as const, reviewIssueId: review.id };
}
async function reconcileProductivityReviews(opts?: {
now?: Date;
companyId?: string;
thresholds?: Partial<ProductivityReviewThresholds>;
}) {
const now = opts?.now ?? new Date();
const thresholds = buildThresholds(opts?.thresholds);
const candidates = await db
.select()
.from(issues)
.where(
and(
opts?.companyId ? eq(issues.companyId, opts.companyId) : undefined,
isNull(issues.hiddenAt),
isNull(issues.assigneeUserId),
inArray(issues.status, ["todo", "in_progress"]),
sql`${issues.assigneeAgentId} is not null`,
sql`${issues.originKind} <> ${PRODUCTIVITY_REVIEW_ORIGIN_KIND}`,
),
)
.orderBy(asc(issues.updatedAt), asc(issues.id))
.limit(MAX_CANDIDATE_ISSUES);
const result = {
scanned: candidates.length,
created: 0,
updated: 0,
existing: 0,
snoozed: 0,
skipped: 0,
failed: 0,
reviewIssueIds: [] as string[],
failedIssueIds: [] as string[],
};
const prefixCache = new Map<string, string>();
for (const candidate of candidates) {
if (!candidate.assigneeAgentId) {
result.skipped += 1;
continue;
}
if (await isProductivityReviewDescendant(candidate)) {
result.skipped += 1;
continue;
}
if (await findRecentResolvedProductivityReview(candidate.companyId, candidate.id, thresholds, now)) {
result.snoozed += 1;
continue;
}
const sourceAgent = await getAgent(candidate.assigneeAgentId);
if (!sourceAgent || sourceAgent.companyId !== candidate.companyId) {
result.skipped += 1;
continue;
}
const evidence = await collectEvidence(candidate, sourceAgent, thresholds, now);
if (!evidence) {
result.skipped += 1;
continue;
}
let prefix = prefixCache.get(candidate.companyId);
if (!prefix) {
prefix = await getCompanyIssuePrefix(candidate.companyId);
prefixCache.set(candidate.companyId, prefix);
}
try {
const outcome = await createOrUpdateReview(evidence, { prefix });
if (outcome.kind === "created") result.created += 1;
else if (outcome.kind === "updated") result.updated += 1;
else result.existing += 1;
result.reviewIssueIds.push(outcome.reviewIssueId);
} catch (err) {
result.failed += 1;
result.failedIssueIds.push(candidate.id);
logger.warn(
{
err,
companyId: candidate.companyId,
issueId: candidate.id,
requestDepth: candidate.requestDepth,
},
"productivity review reconciliation skipped malformed candidate",
);
}
}
return result;
}
async function isProductivityReviewContinuationHoldActive(input: {
companyId: string;
issueId: string;
agentId: string;
now?: Date;
thresholds?: Partial<ProductivityReviewThresholds>;
}) {
const now = input.now ?? new Date();
const thresholds = buildThresholds(input.thresholds);
const [sourceIssue, sourceAgent, openReview] = await Promise.all([
db
.select()
.from(issues)
.where(and(eq(issues.companyId, input.companyId), eq(issues.id, input.issueId)))
.then((rows) => rows[0] ?? null),
getAgent(input.agentId),
findOpenProductivityReview(input.companyId, input.issueId),
]);
if (!sourceIssue || !sourceAgent || !openReview) return { held: false as const };
if (sourceAgent.companyId !== input.companyId) return { held: false as const };
const evidence = await collectEvidence(sourceIssue, sourceAgent, thresholds, now);
if (!evidence || !isSoftStopTrigger(evidence.trigger)) return { held: false as const };
return {
held: true as const,
reviewIssueId: openReview.id,
reviewIdentifier: openReview.identifier,
trigger: evidence.trigger,
reason: evidence.triggerReasons.join("; "),
};
}
async function recordContinuationHold(input: {
companyId: string;
issueId: string;
runId: string;
agentId: string;
reviewIssueId: string;
trigger: ProductivityReviewTrigger;
reason: string;
}) {
await logActivity(db, {
companyId: input.companyId,
actorType: "system",
actorId: "system",
agentId: input.agentId,
runId: input.runId,
action: "issue.productivity_review_continuation_held",
entityType: "issue",
entityId: input.issueId,
details: {
source: "productivity_review.continuation_hold",
reviewIssueId: input.reviewIssueId,
trigger: input.trigger,
reason: input.reason,
},
});
}
return {
reconcileProductivityReviews,
isProductivityReviewContinuationHoldActive,
recordContinuationHold,
};
}

View file

@ -1,5 +1,6 @@
export const RECOVERY_ORIGIN_KINDS = {
issueGraphLivenessEscalation: "harness_liveness_escalation",
issueProductivityReview: "issue_productivity_review",
strandedIssueRecovery: "stranded_issue_recovery",
staleActiveRunEvaluation: "stale_active_run_evaluation",
} as const;

View file

@ -72,7 +72,7 @@ type RecoveryWakeup = (
type LatestIssueRun = Pick<
typeof heartbeatRuns.$inferSelect,
"id" | "agentId" | "status" | "error" | "errorCode" | "contextSnapshot"
"id" | "agentId" | "status" | "error" | "errorCode" | "contextSnapshot" | "livenessState"
> | null;
type WatchdogDecisionActor =
@ -188,6 +188,18 @@ function isUnsuccessfulTerminalIssueRun(latestRun: LatestIssueRun) {
);
}
function isSuccessfulInProgressContinuationRun(latestRun: LatestIssueRun) {
return latestRun?.status === "succeeded";
}
function isProductiveContinuationRun(latestRun: LatestIssueRun) {
return latestRun?.status === "succeeded" &&
(latestRun.livenessState === "advanced" ||
latestRun.livenessState === "completed" ||
latestRun.livenessState === "blocked" ||
latestRun.livenessState === "needs_followup");
}
function parseLivenessIncidentKey(incidentKey: string | null | undefined) {
if (!incidentKey) return null;
return parseIssueGraphLivenessIncidentKey(incidentKey);
@ -299,6 +311,7 @@ export function recoveryService(db: Db, deps: { enqueueWakeup: RecoveryWakeup })
error: heartbeatRuns.error,
errorCode: heartbeatRuns.errorCode,
contextSnapshot: heartbeatRuns.contextSnapshot,
livenessState: heartbeatRuns.livenessState,
})
.from(heartbeatRuns)
.where(
@ -1572,6 +1585,8 @@ export function recoveryService(db: Db, deps: { enqueueWakeup: RecoveryWakeup })
assignmentDispatched: 0,
dispatchRequeued: 0,
continuationRequeued: 0,
productiveContinuationObserved: 0,
successfulContinuationObserved: 0,
orphanBlockersAssigned: 0,
escalated: 0,
skipped: 0,
@ -1690,6 +1705,15 @@ export function recoveryService(db: Db, deps: { enqueueWakeup: RecoveryWakeup })
result.skipped += 1;
continue;
}
if (isSuccessfulInProgressContinuationRun(latestRun)) {
if (isProductiveContinuationRun(latestRun)) {
result.productiveContinuationObserved += 1;
} else {
result.successfulContinuationObserved += 1;
}
result.skipped += 1;
continue;
}
if (didAutomaticRecoveryFail(latestRun, "issue_continuation_needed")) {
const failureSummary = summarizeRunFailureForIssueComment(latestRun);
const updated = await escalateStrandedAssignedIssue({