Batch inline comment wake payloads

Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
dotta 2026-03-28 09:55:41 -05:00
parent e75960f284
commit 91e040a696
14 changed files with 1049 additions and 9 deletions

View file

@ -12,6 +12,7 @@ import {
agentWakeupRequests,
heartbeatRunEvents,
heartbeatRuns,
issueComments,
issues,
projects,
projectWorkspaces,
@ -66,10 +67,15 @@ const MAX_LIVE_LOG_CHUNK_BYTES = 8 * 1024;
const HEARTBEAT_MAX_CONCURRENT_RUNS_DEFAULT = 1;
const HEARTBEAT_MAX_CONCURRENT_RUNS_MAX = 10;
const DEFERRED_WAKE_CONTEXT_KEY = "_paperclipWakeContext";
const WAKE_COMMENT_IDS_KEY = "wakeCommentIds";
const PAPERCLIP_WAKE_PAYLOAD_KEY = "paperclipWake";
const DETACHED_PROCESS_ERROR_CODE = "process_detached";
const startLocksByAgent = new Map<string, Promise<void>>();
const REPO_ONLY_CWD_SENTINEL = "/__paperclip_repo_only__";
const MANAGED_WORKSPACE_GIT_CLONE_TIMEOUT_MS = 10 * 60 * 1000;
const MAX_INLINE_WAKE_COMMENTS = 8;
const MAX_INLINE_WAKE_COMMENT_BODY_CHARS = 4_000;
const MAX_INLINE_WAKE_COMMENT_BODY_TOTAL_CHARS = 12_000;
const execFile = promisify(execFileCallback);
const SESSIONED_LOCAL_ADAPTERS = new Set([
"claude_local",
@ -685,7 +691,9 @@ function deriveCommentId(
contextSnapshot: Record<string, unknown> | null | undefined,
payload: Record<string, unknown> | null | undefined,
) {
const batchedCommentId = extractWakeCommentIds(contextSnapshot).at(-1);
return (
batchedCommentId ??
readNonEmptyString(contextSnapshot?.wakeCommentId) ??
readNonEmptyString(contextSnapshot?.commentId) ??
readNonEmptyString(payload?.commentId) ??
@ -693,6 +701,50 @@ function deriveCommentId(
);
}
export function extractWakeCommentIds(
contextSnapshot: Record<string, unknown> | null | undefined,
): string[] {
const raw = contextSnapshot?.[WAKE_COMMENT_IDS_KEY];
if (!Array.isArray(raw)) return [];
const out: string[] = [];
for (const entry of raw) {
const value = readNonEmptyString(entry);
if (!value || out.includes(value)) continue;
out.push(value);
}
return out;
}
function mergeWakeCommentIds(...values: Array<unknown>): string[] {
const merged: string[] = [];
const append = (value: unknown) => {
const normalized = readNonEmptyString(value);
if (!normalized || merged.includes(normalized)) return;
merged.push(normalized);
};
for (const value of values) {
if (Array.isArray(value)) {
for (const entry of value) append(entry);
continue;
}
if (typeof value === "object" && value !== null) {
const candidate = value as Record<string, unknown>;
const batched = extractWakeCommentIds(candidate);
if (batched.length > 0) {
for (const entry of batched) append(entry);
continue;
}
append(candidate.wakeCommentId);
append(candidate.commentId);
continue;
}
append(value);
}
return merged;
}
function enrichWakeContextSnapshot(input: {
contextSnapshot: Record<string, unknown>;
reason: string | null;
@ -705,6 +757,7 @@ function enrichWakeContextSnapshot(input: {
const commentIdFromPayload = readNonEmptyString(payload?.["commentId"]);
const taskKey = deriveTaskKey(contextSnapshot, payload);
const wakeCommentId = deriveCommentId(contextSnapshot, payload);
const wakeCommentIds = mergeWakeCommentIds(contextSnapshot, commentIdFromPayload);
if (!readNonEmptyString(contextSnapshot["wakeReason"]) && reason) {
contextSnapshot.wakeReason = reason;
@ -721,7 +774,13 @@ function enrichWakeContextSnapshot(input: {
if (!readNonEmptyString(contextSnapshot["commentId"]) && commentIdFromPayload) {
contextSnapshot.commentId = commentIdFromPayload;
}
if (!readNonEmptyString(contextSnapshot["wakeCommentId"]) && wakeCommentId) {
if (wakeCommentIds.length > 0) {
const latestCommentId = wakeCommentIds[wakeCommentIds.length - 1];
contextSnapshot[WAKE_COMMENT_IDS_KEY] = wakeCommentIds;
contextSnapshot.commentId = latestCommentId;
contextSnapshot.wakeCommentId = latestCommentId;
delete contextSnapshot[PAPERCLIP_WAKE_PAYLOAD_KEY];
} else if (!readNonEmptyString(contextSnapshot["wakeCommentId"]) && wakeCommentId) {
contextSnapshot.wakeCommentId = wakeCommentId;
}
if (!readNonEmptyString(contextSnapshot["wakeSource"]) && source) {
@ -740,7 +799,7 @@ function enrichWakeContextSnapshot(input: {
};
}
function mergeCoalescedContextSnapshot(
export function mergeCoalescedContextSnapshot(
existingRaw: unknown,
incoming: Record<string, unknown>,
) {
@ -749,14 +808,136 @@ function mergeCoalescedContextSnapshot(
...existing,
...incoming,
};
const commentId = deriveCommentId(incoming, null);
if (commentId) {
merged.commentId = commentId;
merged.wakeCommentId = commentId;
const mergedCommentIds = mergeWakeCommentIds(existing, incoming);
if (mergedCommentIds.length > 0) {
const latestCommentId = mergedCommentIds[mergedCommentIds.length - 1];
merged[WAKE_COMMENT_IDS_KEY] = mergedCommentIds;
merged.commentId = latestCommentId;
merged.wakeCommentId = latestCommentId;
delete merged[PAPERCLIP_WAKE_PAYLOAD_KEY];
}
return merged;
}
async function buildPaperclipWakePayload(input: {
db: Db;
companyId: string;
contextSnapshot: Record<string, unknown>;
issueSummary?:
| {
id: string;
identifier: string | null;
title: string;
status: string;
priority: string;
}
| null;
}) {
const commentIds = extractWakeCommentIds(input.contextSnapshot);
if (commentIds.length === 0) return null;
const issueId = readNonEmptyString(input.contextSnapshot.issueId);
const issueSummary =
input.issueSummary ??
(issueId
? await input.db
.select({
id: issues.id,
identifier: issues.identifier,
title: issues.title,
status: issues.status,
priority: issues.priority,
})
.from(issues)
.where(and(eq(issues.id, issueId), eq(issues.companyId, input.companyId)))
.then((rows) => rows[0] ?? null)
: null);
const commentRows = await input.db
.select({
id: issueComments.id,
issueId: issueComments.issueId,
body: issueComments.body,
authorAgentId: issueComments.authorAgentId,
authorUserId: issueComments.authorUserId,
createdAt: issueComments.createdAt,
})
.from(issueComments)
.where(
and(
eq(issueComments.companyId, input.companyId),
inArray(issueComments.id, commentIds),
),
);
const commentsById = new Map(commentRows.map((comment) => [comment.id, comment]));
const comments: Array<Record<string, unknown>> = [];
let remainingBodyChars = MAX_INLINE_WAKE_COMMENT_BODY_TOTAL_CHARS;
let truncated = false;
let missingCommentCount = 0;
for (const commentId of commentIds) {
const row = commentsById.get(commentId);
if (!row) {
truncated = true;
missingCommentCount += 1;
continue;
}
if (comments.length >= MAX_INLINE_WAKE_COMMENTS) {
truncated = true;
break;
}
const fullBody = row.body;
const allowedBodyChars = Math.min(MAX_INLINE_WAKE_COMMENT_BODY_CHARS, remainingBodyChars);
if (allowedBodyChars <= 0) {
truncated = true;
break;
}
const body = fullBody.length > allowedBodyChars ? fullBody.slice(0, allowedBodyChars) : fullBody;
const bodyTruncated = body.length < fullBody.length;
if (bodyTruncated) truncated = true;
remainingBodyChars -= body.length;
comments.push({
id: row.id,
issueId: row.issueId,
body,
bodyTruncated,
createdAt: row.createdAt.toISOString(),
author: row.authorAgentId
? { type: "agent", id: row.authorAgentId }
: row.authorUserId
? { type: "user", id: row.authorUserId }
: { type: "system", id: null },
});
}
return {
reason: readNonEmptyString(input.contextSnapshot.wakeReason),
issue: issueSummary
? {
id: issueSummary.id,
identifier: issueSummary.identifier,
title: issueSummary.title,
status: issueSummary.status,
priority: issueSummary.priority,
}
: null,
commentIds,
latestCommentId: commentIds[commentIds.length - 1] ?? null,
comments,
commentWindow: {
requestedCount: commentIds.length,
includedCount: comments.length,
missingCount: missingCommentCount,
},
truncated,
fallbackFetchNeeded: truncated || missingCommentCount > 0,
};
}
function runTaskKey(run: typeof heartbeatRuns.$inferSelect) {
return deriveTaskKey(run.contextSnapshot as Record<string, unknown> | null, null);
}
@ -2098,6 +2279,8 @@ export function heartbeatService(db: Db) {
id: issues.id,
identifier: issues.identifier,
title: issues.title,
status: issues.status,
priority: issues.priority,
projectId: issues.projectId,
projectWorkspaceId: issues.projectWorkspaceId,
executionWorkspaceId: issues.executionWorkspaceId,
@ -2168,12 +2351,33 @@ export function heartbeatService(db: Db) {
id: issueContext.id,
identifier: issueContext.identifier,
title: issueContext.title,
status: issueContext.status,
priority: issueContext.priority,
projectId: issueContext.projectId,
projectWorkspaceId: issueContext.projectWorkspaceId,
executionWorkspaceId: issueContext.executionWorkspaceId,
executionWorkspacePreference: issueContext.executionWorkspacePreference,
}
: null;
const paperclipWakePayload = await buildPaperclipWakePayload({
db,
companyId: agent.companyId,
contextSnapshot: context,
issueSummary: issueRef
? {
id: issueRef.id,
identifier: issueRef.identifier,
title: issueRef.title,
status: issueRef.status,
priority: issueRef.priority,
}
: null,
});
if (paperclipWakePayload) {
context[PAPERCLIP_WAKE_PAYLOAD_KEY] = paperclipWakePayload;
} else {
delete context[PAPERCLIP_WAKE_PAYLOAD_KEY];
}
const existingExecutionWorkspace =
issueRef?.executionWorkspaceId ? await executionWorkspacesSvc.getById(issueRef.executionWorkspaceId) : null;
const shouldReuseExisting =