import { and, asc, desc, eq, inArray, sql } from "drizzle-orm"; import type { Db } from "@paperclipai/db"; import { documentAnnotationAnchorSnapshots, documentAnnotationComments, documentAnnotationThreads, documents, issueDocuments, } from "@paperclipai/db"; import { anchorSnapshotToSelector, remapDocumentAnchor, selectorToAnchorSnapshot, verifyDocumentAnchorSelector, type DocumentAnnotationAnchorSnapshot, type DocumentAnnotationComment, type DocumentAnnotationThread, CreateDocumentAnnotationComment, CreateDocumentAnnotationThread, UpdateDocumentAnnotationThread, } from "@paperclipai/shared"; import { conflict, notFound, unprocessable } from "../errors.js"; type ActorInput = { actorType: "agent" | "user"; actorId: string; agentId?: string | null; userId?: string | null; runId?: string | null; }; type IssueDocumentRow = { issueId: string; companyId: string; documentId: string; documentKey: string; latestBody: string; latestRevisionId: string | null; latestRevisionNumber: number; }; const threadSelect = { id: documentAnnotationThreads.id, companyId: documentAnnotationThreads.companyId, issueId: documentAnnotationThreads.issueId, documentId: documentAnnotationThreads.documentId, documentKey: documentAnnotationThreads.documentKey, status: documentAnnotationThreads.status, anchorState: documentAnnotationThreads.anchorState, anchorConfidence: documentAnnotationThreads.anchorConfidence, originalRevisionId: documentAnnotationThreads.originalRevisionId, originalRevisionNumber: documentAnnotationThreads.originalRevisionNumber, currentRevisionId: documentAnnotationThreads.currentRevisionId, currentRevisionNumber: documentAnnotationThreads.currentRevisionNumber, selectedText: documentAnnotationThreads.selectedText, prefixText: documentAnnotationThreads.prefixText, suffixText: documentAnnotationThreads.suffixText, normalizedStart: documentAnnotationThreads.normalizedStart, normalizedEnd: documentAnnotationThreads.normalizedEnd, markdownStart: documentAnnotationThreads.markdownStart, markdownEnd: documentAnnotationThreads.markdownEnd, anchorSelector: documentAnnotationThreads.anchorSelector, createdByAgentId: documentAnnotationThreads.createdByAgentId, createdByUserId: documentAnnotationThreads.createdByUserId, resolvedByAgentId: documentAnnotationThreads.resolvedByAgentId, resolvedByUserId: documentAnnotationThreads.resolvedByUserId, resolvedAt: documentAnnotationThreads.resolvedAt, createdAt: documentAnnotationThreads.createdAt, updatedAt: documentAnnotationThreads.updatedAt, }; const commentSelect = { id: documentAnnotationComments.id, companyId: documentAnnotationComments.companyId, threadId: documentAnnotationComments.threadId, issueId: documentAnnotationComments.issueId, documentId: documentAnnotationComments.documentId, body: documentAnnotationComments.body, authorType: documentAnnotationComments.authorType, authorAgentId: documentAnnotationComments.authorAgentId, authorUserId: documentAnnotationComments.authorUserId, createdByRunId: documentAnnotationComments.createdByRunId, createdAt: documentAnnotationComments.createdAt, updatedAt: documentAnnotationComments.updatedAt, }; function snapshotFromThread(thread: Pick): DocumentAnnotationAnchorSnapshot { return { selectedText: thread.selectedText, prefixText: thread.prefixText, suffixText: thread.suffixText, normalizedStart: thread.normalizedStart, normalizedEnd: thread.normalizedEnd, markdownStart: thread.markdownStart, markdownEnd: thread.markdownEnd, }; } export function documentAnnotationService(db: Db) { async function getIssueDocument(issueId: string, key: string, dbOrTx: any = db): Promise { return dbOrTx .select({ issueId: issueDocuments.issueId, companyId: documents.companyId, documentId: documents.id, documentKey: issueDocuments.key, latestBody: documents.latestBody, latestRevisionId: documents.latestRevisionId, latestRevisionNumber: documents.latestRevisionNumber, }) .from(issueDocuments) .innerJoin(documents, eq(issueDocuments.documentId, documents.id)) .where(and(eq(issueDocuments.issueId, issueId), eq(issueDocuments.key, key))) .then((rows: IssueDocumentRow[]) => rows[0] ?? null); } async function getThreadForIssue( issueId: string, documentKey: string, threadId: string, dbOrTx: any = db, ): Promise { return dbOrTx .select(threadSelect) .from(documentAnnotationThreads) .where(and( eq(documentAnnotationThreads.id, threadId), eq(documentAnnotationThreads.issueId, issueId), eq(documentAnnotationThreads.documentKey, documentKey), )) .then((rows: DocumentAnnotationThread[]) => rows[0] ?? null); } async function commentsForThreads(threadIds: string[], dbOrTx: any = db): Promise { if (threadIds.length === 0) return []; return dbOrTx .select(commentSelect) .from(documentAnnotationComments) .where(inArray(documentAnnotationComments.threadId, threadIds)) .orderBy(asc(documentAnnotationComments.createdAt), asc(documentAnnotationComments.id)); } return { listThreadsForIssueDocument: async ( issueId: string, key: string, options: { status?: "open" | "resolved" | "all"; includeComments?: boolean } = {}, ) => { const doc = await getIssueDocument(issueId, key); if (!doc) throw notFound("Document not found"); const conditions = [ eq(documentAnnotationThreads.issueId, issueId), eq(documentAnnotationThreads.documentId, doc.documentId), ]; if (options.status && options.status !== "all") { conditions.push(eq(documentAnnotationThreads.status, options.status)); } const threads: DocumentAnnotationThread[] = await db .select(threadSelect) .from(documentAnnotationThreads) .where(and(...conditions)) .orderBy(desc(documentAnnotationThreads.updatedAt), desc(documentAnnotationThreads.id)); if (!options.includeComments) return threads; const comments = await commentsForThreads(threads.map((thread) => thread.id)); const commentsByThread = new Map(); for (const comment of comments) { const existing = commentsByThread.get(comment.threadId) ?? []; existing.push(comment); commentsByThread.set(comment.threadId, existing); } return threads.map((thread) => ({ ...thread, comments: commentsByThread.get(thread.id) ?? [], })); }, getThreadForIssueDocument: async (issueId: string, key: string, threadId: string) => { const thread = await getThreadForIssue(issueId, key, threadId); if (!thread) return null; const comments = await commentsForThreads([thread.id]); return { ...thread, comments }; }, createThread: async ( issueId: string, key: string, input: CreateDocumentAnnotationThread, actor: ActorInput, ) => db.transaction(async (tx) => { await tx.execute(sql` select ${documents.id} from ${issueDocuments} inner join ${documents} on ${issueDocuments.documentId} = ${documents.id} where ${and(eq(issueDocuments.issueId, issueId), eq(issueDocuments.key, key))} for update of ${documents} `); const doc = await getIssueDocument(issueId, key, tx); if (!doc) throw notFound("Document not found"); if ( input.baseRevisionId !== doc.latestRevisionId || input.baseRevisionNumber !== doc.latestRevisionNumber ) { throw conflict("Annotation anchor requires the current document revision", { currentRevisionId: doc.latestRevisionId, currentRevisionNumber: doc.latestRevisionNumber, }); } const verification = verifyDocumentAnchorSelector({ markdown: doc.latestBody, selector: input.selector, }); if (!verification.ok || !verification.anchor) { throw unprocessable("Annotation anchor does not match the current document revision", { reason: verification.reason, }); } const now = new Date(); const [thread] = await tx .insert(documentAnnotationThreads) .values({ companyId: doc.companyId, issueId, documentId: doc.documentId, documentKey: doc.documentKey, status: "open", anchorState: "active", anchorConfidence: "exact", originalRevisionId: doc.latestRevisionId, originalRevisionNumber: doc.latestRevisionNumber, currentRevisionId: doc.latestRevisionId, currentRevisionNumber: doc.latestRevisionNumber, selectedText: verification.anchor.selectedText, prefixText: verification.anchor.prefixText, suffixText: verification.anchor.suffixText, normalizedStart: verification.anchor.normalizedStart, normalizedEnd: verification.anchor.normalizedEnd, markdownStart: verification.anchor.markdownStart, markdownEnd: verification.anchor.markdownEnd, anchorSelector: input.selector, createdByAgentId: actor.agentId ?? null, createdByUserId: actor.userId ?? null, createdAt: now, updatedAt: now, }) .returning(threadSelect); const [comment] = await tx .insert(documentAnnotationComments) .values({ companyId: doc.companyId, threadId: thread.id, issueId, documentId: doc.documentId, body: input.body, authorType: actor.actorType, authorAgentId: actor.agentId ?? null, authorUserId: actor.userId ?? null, createdByRunId: actor.runId ?? null, createdAt: now, updatedAt: now, }) .returning(commentSelect); return { ...thread, comments: [comment] }; }), addComment: async ( issueId: string, key: string, threadId: string, input: CreateDocumentAnnotationComment, actor: ActorInput, ) => db.transaction(async (tx) => { const thread = await getThreadForIssue(issueId, key, threadId, tx); if (!thread) throw notFound("Annotation thread not found"); const now = new Date(); const [comment] = await tx .insert(documentAnnotationComments) .values({ companyId: thread.companyId, threadId: thread.id, issueId: thread.issueId, documentId: thread.documentId, body: input.body, authorType: actor.actorType, authorAgentId: actor.agentId ?? null, authorUserId: actor.userId ?? null, createdByRunId: actor.runId ?? null, createdAt: now, updatedAt: now, }) .returning(commentSelect); await tx .update(documentAnnotationThreads) .set({ updatedAt: now }) .where(eq(documentAnnotationThreads.id, thread.id)); return comment; }), updateThread: async ( issueId: string, key: string, threadId: string, input: UpdateDocumentAnnotationThread, actor: ActorInput, ) => db.transaction(async (tx) => { const thread = await getThreadForIssue(issueId, key, threadId, tx); if (!thread) throw notFound("Annotation thread not found"); if (!input.status || input.status === thread.status) return thread; const now = new Date(); const [updated] = await tx .update(documentAnnotationThreads) .set(input.status === "resolved" ? { status: "resolved", resolvedByAgentId: actor.agentId ?? null, resolvedByUserId: actor.userId ?? null, resolvedAt: now, updatedAt: now, } : { status: "open", resolvedByAgentId: null, resolvedByUserId: null, resolvedAt: null, updatedAt: now, }) .where(eq(documentAnnotationThreads.id, thread.id)) .returning(threadSelect); return updated; }), remapOpenThreadsForDocument: async (input: { issueId: string; key: string; documentId: string; nextRevisionId: string | null; nextRevisionNumber: number; nextBody: string; }) => db.transaction(async (tx) => { const threads: DocumentAnnotationThread[] = await tx .select(threadSelect) .from(documentAnnotationThreads) .where(and( eq(documentAnnotationThreads.issueId, input.issueId), eq(documentAnnotationThreads.documentId, input.documentId), eq(documentAnnotationThreads.status, "open"), )); const changed = []; const now = new Date(); for (const thread of threads) { if (thread.currentRevisionId === input.nextRevisionId) continue; const previousAnchor = snapshotFromThread(thread); const remap = remapDocumentAnchor({ previousAnchor, nextMarkdown: input.nextBody, }); const nextAnchor = remap.anchor; const nextSelector = nextAnchor ? anchorSnapshotToSelector(nextAnchor) : thread.anchorSelector; const [updated] = await tx .update(documentAnnotationThreads) .set({ currentRevisionId: input.nextRevisionId, currentRevisionNumber: input.nextRevisionNumber, anchorState: remap.anchorState, anchorConfidence: remap.confidence, ...(nextAnchor ? { selectedText: nextAnchor.selectedText, prefixText: nextAnchor.prefixText, suffixText: nextAnchor.suffixText, normalizedStart: nextAnchor.normalizedStart, normalizedEnd: nextAnchor.normalizedEnd, markdownStart: nextAnchor.markdownStart, markdownEnd: nextAnchor.markdownEnd, } : {}), anchorSelector: nextSelector, updatedAt: now, }) .where(eq(documentAnnotationThreads.id, thread.id)) .returning(threadSelect); const [snapshot] = await tx .insert(documentAnnotationAnchorSnapshots) .values({ companyId: thread.companyId, threadId: thread.id, documentId: thread.documentId, fromRevisionId: thread.currentRevisionId, fromRevisionNumber: thread.currentRevisionNumber, toRevisionId: input.nextRevisionId, toRevisionNumber: input.nextRevisionNumber, previousAnchor, nextAnchor, anchorState: remap.anchorState, anchorConfidence: remap.confidence, failureReason: remap.anchor ? null : remap.reason, createdAt: now, }) .returning(); changed.push({ thread: updated, snapshot }); } return changed; }), selectorToAnchorSnapshot, }; }