From b0c38705cecce7cafce051a10b08176282965205 Mon Sep 17 00:00:00 2001 From: Paperclip Bot Date: Tue, 2 Jun 2026 07:00:21 +0000 Subject: [PATCH] Implement one-way Paperclip to Forgejo issue sync --- README.md | 13 +- migrations/001_initial.sql | 17 ++- src/config.ts | 20 ++- src/forgejo-client.ts | 56 +++++++ src/manifest.ts | 37 +++-- src/paperclip-issue-sync.ts | 208 ++++++++++++++++++++++++++ src/persistence.ts | 175 ++++++++++++++++------ src/reconciliation.ts | 20 +++ src/types.ts | 44 +++++- src/webhook-intake.ts | 153 ------------------- src/worker.ts | 23 +-- tests/paperclip-issue-sync.spec.ts | 228 +++++++++++++++++++++++++++++ 12 files changed, 746 insertions(+), 248 deletions(-) create mode 100644 src/forgejo-client.ts create mode 100644 src/paperclip-issue-sync.ts delete mode 100644 src/webhook-intake.ts create mode 100644 tests/paperclip-issue-sync.spec.ts diff --git a/README.md b/README.md index 43dbdfa..259608b 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,8 @@ # Forgejo Issue Sync Plugin -Scaffold for a Paperclip plugin that will sync Forgejo issues/comments while enforcing the v1 policy from `PRIA-13`: +Paperclip plugin for one-way Paperclip issue -> Forgejo issue creation while enforcing the v1 attachment policy: -- webhook intake stays inside the plugin worker +- selected Paperclip issues are detected inside the plugin worker - scheduled reconciliation stays in plugin `jobs` - mappings, dedupe, and review state stay in the plugin database/state - attachment handling is metadata-only @@ -12,15 +12,16 @@ Scaffold for a Paperclip plugin that will sync Forgejo issues/comments while enf - `src/manifest.ts`: manifest, capabilities, jobs, webhook declaration, instance config schema - `src/worker.ts`: plugin bootstrap, health, config validation, data/action registration -- `src/webhook-intake.ts`: webhook verification, normalization, dedupe recording, manual-review queueing +- `src/paperclip-issue-sync.ts`: issue selection, payload shaping, and outbound sync flow +- `src/forgejo-client.ts`: outbound Forgejo API client - `src/reconciliation.ts`: scheduled reconciliation job and instance-level last-run state -- `src/persistence.ts`: namespace-local persistence helpers for mappings, deliveries, reviews, and run snapshots +- `src/persistence.ts`: namespace-local persistence helpers for mappings, reviews, and run snapshots - `src/attachment-policy.ts`: metadata-only attachment policy and synced markdown formatter - `migrations/001_initial.sql`: plugin-owned tables for mappings, dedupe, review queue, and reconciliation history ## Attachment Policy -This scaffold deliberately does not fetch attachment bytes and does not add any path that calls `/api/attachments/{id}/content`. +This plugin deliberately does not fetch attachment bytes and does not add any path that calls `/api/attachments/{id}/content`. Instead it: @@ -31,7 +32,7 @@ Instead it: ## Follow-Up Needed -The plugin now emits `attachments_context_required` as a durable review signal, but the human-review destination is still a follow-up decision: +The plugin emits `attachments_context_required` as a durable review signal, but the human-review destination is still a follow-up decision: - create Paperclip-visible review issues/comments - expose a plugin UI or scoped API route for triage diff --git a/migrations/001_initial.sql b/migrations/001_initial.sql index 174f5ed..e4f2cd6 100644 --- a/migrations/001_initial.sql +++ b/migrations/001_initial.sql @@ -16,15 +16,24 @@ CREATE TABLE webhook_deliveries ( CREATE TABLE issue_mappings ( company_id uuid NOT NULL, - source_id text NOT NULL, + paperclip_issue_id uuid NOT NULL, + forgejo_issue_id bigint, + forgejo_issue_number integer, + forgejo_issue_url text, + forgejo_api_url text, + repo_owner text NOT NULL, + repo_name text NOT NULL, dedupe_key text NOT NULL, - title text, - body text NOT NULL, + source_title text NOT NULL, + source_body text NOT NULL, attachment_metadata jsonb NOT NULL DEFAULT '[]'::jsonb, manual_review_required boolean NOT NULL DEFAULT false, review_reason_code text, + sync_status text NOT NULL DEFAULT 'pending', + last_error text, + created_at timestamptz NOT NULL DEFAULT now(), updated_at timestamptz NOT NULL DEFAULT now(), - PRIMARY KEY (company_id, source_id) + PRIMARY KEY (company_id, paperclip_issue_id) ); CREATE TABLE comment_mappings ( diff --git a/src/config.ts b/src/config.ts index 3d11a41..d640264 100644 --- a/src/config.ts +++ b/src/config.ts @@ -9,8 +9,10 @@ export function readConfig(raw: Record): ForgejoPluginConfig { return { forgejoBaseUrl: optionalString(raw.forgejoBaseUrl), forgejoTokenRef: optionalString(raw.forgejoTokenRef), - webhookSecretRef: optionalString(raw.webhookSecretRef), + forgejoOwner: optionalString(raw.forgejoOwner), + forgejoRepo: optionalString(raw.forgejoRepo), defaultCompanyId: optionalString(raw.defaultCompanyId), + syncIssueLabel: optionalString(raw.syncIssueLabel), reconciliationLookbackMinutes: typeof lookback === "number" && Number.isFinite(lookback) && lookback > 0 ? Math.floor(lookback) @@ -22,17 +24,27 @@ export function validateConfig(raw: Record): { ok: boolean; err const errors: string[] = []; const warnings: string[] = []; const config = readConfig(raw); + const outboundFields = [ + config.forgejoBaseUrl, + config.forgejoTokenRef, + config.forgejoOwner, + config.forgejoRepo + ]; if (config.forgejoBaseUrl && !/^https?:\/\//.test(config.forgejoBaseUrl)) { errors.push("forgejoBaseUrl must start with http:// or https://"); } - if (config.webhookSecretRef && !config.forgejoBaseUrl) { - warnings.push("webhookSecretRef is configured before forgejoBaseUrl; webhook auth is ready but outbound sync is not."); + if (outboundFields.some(Boolean) && outboundFields.some((value) => !value)) { + errors.push("forgejoBaseUrl, forgejoTokenRef, forgejoOwner, and forgejoRepo must all be configured together."); } if (!config.defaultCompanyId) { - warnings.push("defaultCompanyId is not set; webhook payloads must provide companyId metadata."); + warnings.push("defaultCompanyId is not set; the reconciliation job cannot backfill unsynced issues across a company."); + } + + if (!config.syncIssueLabel) { + warnings.push("syncIssueLabel is not set; the plugin will use the default label \"forgejo-sync\"."); } return { diff --git a/src/forgejo-client.ts b/src/forgejo-client.ts new file mode 100644 index 0000000..49d9aa5 --- /dev/null +++ b/src/forgejo-client.ts @@ -0,0 +1,56 @@ +import type { PluginContext } from "@paperclipai/plugin-sdk"; +import { readConfig } from "./config.js"; +import type { ForgejoIssuePayload, ForgejoIssueRecord } from "./types.js"; + +function joinUrl(baseUrl: string, path: string): string { + return `${baseUrl.replace(/\/+$/, "")}${path}`; +} + +export async function createForgejoIssue( + ctx: PluginContext, + payload: ForgejoIssuePayload +): Promise { + const config = readConfig(await ctx.config.get()); + if (!config.forgejoBaseUrl || !config.forgejoTokenRef || !config.forgejoOwner || !config.forgejoRepo) { + throw new Error("Forgejo outbound sync is not fully configured."); + } + + const token = await ctx.secrets.resolve(config.forgejoTokenRef); + const response = await ctx.http.fetch( + joinUrl(config.forgejoBaseUrl, `/api/v1/repos/${encodeURIComponent(config.forgejoOwner)}/${encodeURIComponent(config.forgejoRepo)}/issues`), + { + method: "POST", + headers: { + authorization: `token ${token}`, + "content-type": "application/json", + accept: "application/json" + }, + body: JSON.stringify({ + title: payload.title, + body: payload.body + }) + } + ); + + if (!response.ok) { + const responseText = await response.text(); + throw new Error(`Forgejo issue creation failed (${response.status}): ${responseText || response.statusText}`); + } + + const body = await response.json() as Record; + const id = Number(body.id); + const number = Number(body.number); + const url = typeof body.html_url === "string" ? body.html_url : null; + const apiUrl = typeof body.url === "string" ? body.url : null; + + if (!Number.isFinite(id) || !Number.isFinite(number) || !url || !apiUrl) { + throw new Error("Forgejo issue creation returned an incomplete response."); + } + + return { + id, + number, + url, + apiUrl + }; +} diff --git a/src/manifest.ts b/src/manifest.ts index eed4dec..8071001 100644 --- a/src/manifest.ts +++ b/src/manifest.ts @@ -1,12 +1,12 @@ import type { PaperclipPluginManifestV1 } from "@paperclipai/plugin-sdk"; -import { JOB_KEYS, PLUGIN_ID, PLUGIN_VERSION, WEBHOOK_KEYS } from "./constants.js"; +import { JOB_KEYS, PLUGIN_ID, PLUGIN_VERSION } from "./constants.js"; const manifest: PaperclipPluginManifestV1 = { id: PLUGIN_ID, apiVersion: 1, version: PLUGIN_VERSION, displayName: "Forgejo Issue Sync", - description: "Scaffold for Forgejo issue sync with webhook intake, scheduled reconciliation, and metadata-only attachment handling.", + description: "Creates Forgejo issues from selected Paperclip issues with durable mapping, scheduled reconciliation, and metadata-only attachment handling.", author: "Private Adoption Company", categories: ["connector", "automation"], capabilities: [ @@ -14,13 +14,14 @@ const manifest: PaperclipPluginManifestV1 = { "database.namespace.migrate", "database.namespace.read", "database.namespace.write", + "events.subscribe", "http.outbound", "instance.settings.register", + "issues.read", "jobs.schedule", "plugin.state.read", "plugin.state.write", - "secrets.read-ref", - "webhooks.receive" + "secrets.read-ref" ], entrypoints: { worker: "./dist/worker.js" @@ -38,15 +39,26 @@ const manifest: PaperclipPluginManifestV1 = { title: "Forgejo Token Secret Ref", description: "Secret reference for outbound Forgejo API authentication." }, - webhookSecretRef: { + forgejoOwner: { type: "string", - title: "Webhook Secret Ref", - description: "Secret reference used to verify webhook signatures." + title: "Forgejo Owner", + description: "Forgejo owner or organization that will own created issues." + }, + forgejoRepo: { + type: "string", + title: "Forgejo Repository", + description: "Repository name that will receive created issues." }, defaultCompanyId: { type: "string", title: "Default Company ID", - description: "Fallback company used when webhook payloads do not carry explicit Paperclip company metadata." + description: "Company scanned by the reconciliation job for unsynced eligible issues." + }, + syncIssueLabel: { + type: "string", + title: "Sync Issue Label", + default: "forgejo-sync", + description: "Only Paperclip issues with this label are created in Forgejo." }, reconciliationLookbackMinutes: { type: "number", @@ -65,16 +77,9 @@ const manifest: PaperclipPluginManifestV1 = { { jobKey: JOB_KEYS.reconcile, displayName: "Reconcile Forgejo Sync Drift", - description: "Reconciles stored webhook deliveries, mapping rows, and pending manual-review items.", + description: "Backfills eligible Paperclip issues and records sync health snapshots.", schedule: "0 * * * *" } - ], - webhooks: [ - { - endpointKey: WEBHOOK_KEYS.forgejo, - displayName: "Forgejo Events", - description: "Receives Forgejo issue and comment webhook deliveries for normalization and dedupe." - } ] }; diff --git a/src/paperclip-issue-sync.ts b/src/paperclip-issue-sync.ts new file mode 100644 index 0000000..cfe844a --- /dev/null +++ b/src/paperclip-issue-sync.ts @@ -0,0 +1,208 @@ +import type { PluginContext } from "@paperclipai/plugin-sdk"; +import type { Issue } from "@paperclipai/shared"; +import { buildForgejoSyncContent } from "./attachment-policy.js"; +import { readConfig } from "./config.js"; +import { createForgejoIssue } from "./forgejo-client.js"; +import { + completeIssueMapping, + enqueueManualReview, + failIssueMapping, + reserveIssueMapping +} from "./persistence.js"; +import type { + AttachmentMetadata, + ForgejoIssuePayload, + ForgejoPluginConfig, + ForgejoIssueRecord, + IssueReservationResult +} from "./types.js"; + +export const DEFAULT_SYNC_LABEL = "forgejo-sync"; + +type ArtifactWorkProduct = { + id: string; + type: string; + title: string; + url: string | null; + externalId: string | null; + metadata: Record | null; +}; + +type IssueSyncDependencies = { + reserve: (ctx: PluginContext, draft: ReturnType) => Promise; + createRemoteIssue: (ctx: PluginContext, payload: ForgejoIssuePayload) => Promise; + complete: ( + ctx: PluginContext, + companyId: string, + issueId: string, + remoteIssue: ForgejoIssueRecord + ) => Promise; + fail: (ctx: PluginContext, companyId: string, issueId: string, errorMessage: string) => Promise; + queueManualReview: (ctx: PluginContext, draft: ReturnType) => Promise; +}; + +const defaultDependencies: IssueSyncDependencies = { + reserve: reserveIssueMapping, + createRemoteIssue: createForgejoIssue, + complete: completeIssueMapping, + fail: failIssueMapping, + queueManualReview: enqueueManualReview +}; + +function asString(value: unknown): string | null { + return typeof value === "string" && value.trim().length > 0 ? value : null; +} + +function asNumber(value: unknown): number | null { + return typeof value === "number" && Number.isFinite(value) ? value : null; +} + +function isArtifactMetadata(value: Record | null): boolean { + return Boolean( + value + && (typeof value.attachmentId === "string" + || typeof value.originalFilename === "string" + || typeof value.contentType === "string" + || typeof value.byteSize === "number") + ); +} + +export function extractAttachmentMetadata(issue: Issue): AttachmentMetadata[] { + const workProducts = (issue as Issue & { workProducts?: ArtifactWorkProduct[] }).workProducts ?? []; + return workProducts + .filter((workProduct) => workProduct.type === "artifact") + .map((workProduct) => { + const metadata = workProduct.metadata ?? {}; + const artifactMetadata = isArtifactMetadata(metadata) ? metadata : {}; + return { + filename: asString(artifactMetadata.originalFilename) ?? workProduct.title ?? null, + mimeType: asString(artifactMetadata.contentType), + sizeBytes: asNumber(artifactMetadata.byteSize), + sourceUrl: workProduct.url, + sourceId: asString(artifactMetadata.attachmentId) ?? workProduct.externalId ?? workProduct.id + }; + }); +} + +export function isIssueSelected(issue: Issue, config: ForgejoPluginConfig): boolean { + const syncLabel = config.syncIssueLabel ?? DEFAULT_SYNC_LABEL; + const labels = issue.labels ?? []; + return labels.some((label) => label.name === syncLabel); +} + +export function buildForgejoIssuePayload(issue: Issue): ForgejoIssuePayload { + const attachments = extractAttachmentMetadata(issue); + const syncContent = buildForgejoSyncContent(issue.description ?? "", attachments); + const metadataLines = [ + `- Paperclip issue: ${issue.identifier ?? issue.id}`, + `- Status: ${issue.status}`, + `- Priority: ${issue.priority}` + ]; + + if (issue.project?.name) { + metadataLines.push(`- Project: ${issue.project.name}`); + } + + return { + title: issue.identifier ? `[${issue.identifier}] ${issue.title}` : issue.title, + body: [ + "Paperclip issue synced by Forgejo Issue Sync.", + "", + ...metadataLines, + "", + "## Description", + "", + syncContent.markdown, + "", + `` + ].join("\n").trim() + }; +} + +export function buildIssueSyncDraft(issue: Issue, config: ForgejoPluginConfig) { + if (!config.forgejoOwner || !config.forgejoRepo) { + throw new Error("Forgejo owner and repo must be configured before syncing issues."); + } + + const payload = buildForgejoIssuePayload(issue); + const attachments = extractAttachmentMetadata(issue); + const syncContent = buildForgejoSyncContent(issue.description ?? "", attachments); + return { + companyId: issue.companyId, + paperclipIssueId: issue.id, + repoOwner: config.forgejoOwner, + repoName: config.forgejoRepo, + dedupeKey: `paperclip-issue:${issue.companyId}:${issue.id}`, + sourceTitle: payload.title, + sourceBody: payload.body, + attachmentMetadata: syncContent.attachmentMetadata, + manualReviewRequired: syncContent.reviewSignal.manualReviewRequired, + reviewReasonCode: syncContent.reviewSignal.reasonCode + }; +} + +export async function syncIssueToForgejo( + ctx: PluginContext, + issue: Issue, + dependencies: Partial = {} +): Promise<"created" | "skipped" | "existing"> { + const config = readConfig(await ctx.config.get()); + if (!isIssueSelected(issue, config)) { + return "skipped"; + } + + const deps = { ...defaultDependencies, ...dependencies }; + const payload = buildForgejoIssuePayload(issue); + const draft = buildIssueSyncDraft(issue, config); + const reservation = await deps.reserve(ctx, draft); + + if (reservation.kind === "existing") { + return reservation.mapping.syncStatus === "synced" ? "existing" : "skipped"; + } + + try { + const remoteIssue = await deps.createRemoteIssue(ctx, payload); + await deps.complete(ctx, issue.companyId, issue.id, remoteIssue); + + if (draft.manualReviewRequired) { + await deps.queueManualReview(ctx, draft); + } + + await ctx.activity.log({ + companyId: issue.companyId, + entityType: "issue", + entityId: issue.id, + message: `Created Forgejo issue #${remoteIssue.number} for ${issue.identifier ?? issue.id}.`, + metadata: { + forgejoIssueId: remoteIssue.id, + forgejoIssueNumber: remoteIssue.number, + forgejoIssueUrl: remoteIssue.url, + manualReviewRequired: draft.manualReviewRequired + } + }); + + return "created"; + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + await deps.fail(ctx, issue.companyId, issue.id, message); + throw error; + } +} + +export async function syncIssueById(ctx: PluginContext, companyId: string, issueId: string): Promise<"created" | "skipped" | "existing"> { + const issue = await ctx.issues.get(issueId, companyId); + if (!issue) { + throw new Error(`Issue ${issueId} was not found in company ${companyId}.`); + } + return syncIssueToForgejo(ctx, issue); +} + +export function registerIssueSyncHandlers(ctx: PluginContext): void { + const handleEvent = async (event: { companyId: string; entityId?: string }) => { + if (!event.entityId) return; + await syncIssueById(ctx, event.companyId, event.entityId); + }; + + ctx.events.on("issue.created", handleEvent); + ctx.events.on("issue.updated", handleEvent); +} diff --git a/src/persistence.ts b/src/persistence.ts index 319b594..e0d6344 100644 --- a/src/persistence.ts +++ b/src/persistence.ts @@ -1,63 +1,142 @@ import type { PluginContext } from "@paperclipai/plugin-sdk"; -import type { NormalizedSyncCandidate, ReconciliationSnapshot } from "./types.js"; +import type { + ForgejoIssueRecord, + IssueMappingRecord, + IssueReservationResult, + IssueSyncDraft, + ReconciliationSnapshot +} from "./types.js"; function tableName(ctx: PluginContext, name: string): string { return `${ctx.db.namespace}.${name}`; } -export async function recordWebhookDelivery( +function toIssueMappingRecord(row: Record): IssueMappingRecord { + return { + companyId: String(row.company_id), + paperclipIssueId: String(row.paperclip_issue_id), + repoOwner: String(row.repo_owner), + repoName: String(row.repo_name), + dedupeKey: String(row.dedupe_key), + sourceTitle: String(row.source_title), + sourceBody: String(row.source_body), + attachmentMetadata: Array.isArray(row.attachment_metadata) ? row.attachment_metadata as IssueMappingRecord["attachmentMetadata"] : [], + manualReviewRequired: Boolean(row.manual_review_required), + reviewReasonCode: typeof row.review_reason_code === "string" ? row.review_reason_code : null, + forgejoIssueId: typeof row.forgejo_issue_id === "number" ? row.forgejo_issue_id : null, + forgejoIssueNumber: typeof row.forgejo_issue_number === "number" ? row.forgejo_issue_number : null, + forgejoIssueUrl: typeof row.forgejo_issue_url === "string" ? row.forgejo_issue_url : null, + forgejoApiUrl: typeof row.forgejo_api_url === "string" ? row.forgejo_api_url : null, + syncStatus: row.sync_status === "pending" || row.sync_status === "failed" ? row.sync_status : "synced", + lastError: typeof row.last_error === "string" ? row.last_error : null + }; +} + +export async function getIssueMapping( ctx: PluginContext, - input: { - requestId: string; - deliveryKey: string; - eventName: string; - companyId: string; - payload: unknown; + companyId: string, + paperclipIssueId: string +): Promise { + const [row] = await ctx.db.query>( + `SELECT * + FROM ${tableName(ctx, "issue_mappings")} + WHERE company_id = $1 AND paperclip_issue_id = $2`, + [companyId, paperclipIssueId] + ); + return row ? toIssueMappingRecord(row) : null; +} + +export async function reserveIssueMapping(ctx: PluginContext, draft: IssueSyncDraft): Promise { + const baseParams = [ + draft.companyId, + draft.paperclipIssueId, + draft.repoOwner, + draft.repoName, + draft.dedupeKey, + draft.sourceTitle, + draft.sourceBody, + JSON.stringify(draft.attachmentMetadata), + draft.manualReviewRequired, + draft.reviewReasonCode + ]; + + const insert = await ctx.db.execute( + `INSERT INTO ${tableName(ctx, "issue_mappings")} + (company_id, paperclip_issue_id, repo_owner, repo_name, dedupe_key, source_title, source_body, attachment_metadata, manual_review_required, review_reason_code, sync_status, created_at, updated_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8::jsonb, $9, $10, 'pending', now(), now()) + ON CONFLICT (company_id, paperclip_issue_id) DO NOTHING`, + baseParams + ); + if (insert.rowCount > 0) { + return { kind: "reserved" }; } + + const retry = await ctx.db.execute( + `UPDATE ${tableName(ctx, "issue_mappings")} + SET repo_owner = $3, + repo_name = $4, + dedupe_key = $5, + source_title = $6, + source_body = $7, + attachment_metadata = $8::jsonb, + manual_review_required = $9, + review_reason_code = $10, + sync_status = 'pending', + last_error = NULL, + updated_at = now() + WHERE company_id = $1 + AND paperclip_issue_id = $2 + AND sync_status = 'failed'`, + baseParams + ); + if (retry.rowCount > 0) { + return { kind: "reserved" }; + } + + const existing = await getIssueMapping(ctx, draft.companyId, draft.paperclipIssueId); + if (!existing) { + throw new Error("Issue mapping reservation failed without returning an existing row."); + } + return { kind: "existing", mapping: existing }; +} + +export async function completeIssueMapping( + ctx: PluginContext, + companyId: string, + paperclipIssueId: string, + remoteIssue: ForgejoIssueRecord ): Promise { await ctx.db.execute( - `INSERT INTO ${tableName(ctx, "webhook_deliveries")} - (request_id, delivery_key, event_name, company_id, payload, status, received_at) - VALUES ($1, $2, $3, $4, $5::jsonb, 'received', now()) - ON CONFLICT (request_id) DO UPDATE SET - delivery_key = EXCLUDED.delivery_key, - event_name = EXCLUDED.event_name, - company_id = EXCLUDED.company_id, - payload = EXCLUDED.payload, - status = EXCLUDED.status, - received_at = now()`, - [input.requestId, input.deliveryKey, input.eventName, input.companyId, JSON.stringify(input.payload)] + `UPDATE ${tableName(ctx, "issue_mappings")} + SET forgejo_issue_id = $3, + forgejo_issue_number = $4, + forgejo_issue_url = $5, + forgejo_api_url = $6, + sync_status = 'synced', + last_error = NULL, + updated_at = now() + WHERE company_id = $1 AND paperclip_issue_id = $2`, + [companyId, paperclipIssueId, remoteIssue.id, remoteIssue.number, remoteIssue.url, remoteIssue.apiUrl] ); } -export async function recordSyncCandidate(ctx: PluginContext, candidate: NormalizedSyncCandidate): Promise { - const targetTable = candidate.sourceKind === "issue" ? "issue_mappings" : "comment_mappings"; +export async function failIssueMapping( + ctx: PluginContext, + companyId: string, + paperclipIssueId: string, + errorMessage: string +): Promise { await ctx.db.execute( - `INSERT INTO ${tableName(ctx, targetTable)} - (company_id, source_id, dedupe_key, title, body, attachment_metadata, manual_review_required, review_reason_code, updated_at) - VALUES ($1, $2, $3, $4, $5, $6::jsonb, $7, $8, now()) - ON CONFLICT (company_id, source_id) DO UPDATE SET - dedupe_key = EXCLUDED.dedupe_key, - title = EXCLUDED.title, - body = EXCLUDED.body, - attachment_metadata = EXCLUDED.attachment_metadata, - manual_review_required = EXCLUDED.manual_review_required, - review_reason_code = EXCLUDED.review_reason_code, - updated_at = now()`, - [ - candidate.companyId, - candidate.sourceId, - candidate.dedupeKey, - candidate.title, - candidate.body, - JSON.stringify(candidate.attachmentMetadata), - candidate.reviewSignal.manualReviewRequired, - candidate.reviewSignal.reasonCode - ] + `UPDATE ${tableName(ctx, "issue_mappings")} + SET sync_status = 'failed', + last_error = $3, + updated_at = now() + WHERE company_id = $1 AND paperclip_issue_id = $2`, + [companyId, paperclipIssueId, errorMessage] ); } -export async function enqueueManualReview(ctx: PluginContext, candidate: NormalizedSyncCandidate): Promise { +export async function enqueueManualReview(ctx: PluginContext, candidate: IssueSyncDraft): Promise { await ctx.db.execute( `INSERT INTO ${tableName(ctx, "review_queue")} (company_id, source_kind, source_id, dedupe_key, review_reason_code, review_payload, status, created_at, updated_at) @@ -70,14 +149,14 @@ export async function enqueueManualReview(ctx: PluginContext, candidate: Normali updated_at = now()`, [ candidate.companyId, - candidate.sourceKind, - candidate.sourceId, + "issue", + candidate.paperclipIssueId, candidate.dedupeKey, - candidate.reviewSignal.reasonCode, + candidate.reviewReasonCode, JSON.stringify({ - reasons: candidate.reviewSignal.reasons, + reasons: candidate.reviewReasonCode ? [candidate.reviewReasonCode] : [], attachmentMetadata: candidate.attachmentMetadata, - title: candidate.title + title: candidate.sourceTitle }) ] ); diff --git a/src/reconciliation.ts b/src/reconciliation.ts index a705622..b9e45c6 100644 --- a/src/reconciliation.ts +++ b/src/reconciliation.ts @@ -1,5 +1,7 @@ import type { PluginContext, PluginJobContext } from "@paperclipai/plugin-sdk"; import { JOB_KEYS } from "./constants.js"; +import { readConfig } from "./config.js"; +import { isIssueSelected, syncIssueToForgejo } from "./paperclip-issue-sync.js"; import { recordReconciliationRun, readReconciliationSnapshot } from "./persistence.js"; import type { ReconciliationSnapshot } from "./types.js"; @@ -12,6 +14,24 @@ function instanceStateKey() { } export async function runReconciliation(ctx: PluginContext, trigger: string): Promise { + const config = readConfig(await ctx.config.get()); + if (config.defaultCompanyId) { + const lookbackMinutes = config.reconciliationLookbackMinutes ?? 60; + const threshold = Date.now() - lookbackMinutes * 60_000; + const issues = await ctx.issues.list({ + companyId: config.defaultCompanyId, + limit: 100 + }); + + for (const issue of issues) { + const updatedAt = issue.updatedAt instanceof Date ? issue.updatedAt.getTime() : new Date(issue.updatedAt).getTime(); + if (updatedAt < threshold || !isIssueSelected(issue, config)) { + continue; + } + await syncIssueToForgejo(ctx, issue); + } + } + const snapshot = await readReconciliationSnapshot(ctx); snapshot.trigger = trigger; snapshot.completedAt = new Date().toISOString(); diff --git a/src/types.ts b/src/types.ts index c0d9343..9bd7aec 100644 --- a/src/types.ts +++ b/src/types.ts @@ -43,7 +43,49 @@ export type ReconciliationSnapshot = { export type ForgejoPluginConfig = { forgejoBaseUrl?: string; forgejoTokenRef?: string; - webhookSecretRef?: string; + forgejoOwner?: string; + forgejoRepo?: string; defaultCompanyId?: string; + syncIssueLabel?: string; reconciliationLookbackMinutes?: number; }; + +export type PaperclipIssueAttachment = AttachmentMetadata; + +export type ForgejoIssuePayload = { + title: string; + body: string; +}; + +export type ForgejoIssueRecord = { + id: number; + number: number; + url: string; + apiUrl: string; +}; + +export type IssueSyncDraft = { + companyId: string; + paperclipIssueId: string; + repoOwner: string; + repoName: string; + dedupeKey: string; + sourceTitle: string; + sourceBody: string; + attachmentMetadata: AttachmentMetadata[]; + manualReviewRequired: boolean; + reviewReasonCode: string | null; +}; + +export type IssueMappingRecord = IssueSyncDraft & { + forgejoIssueId: number | null; + forgejoIssueNumber: number | null; + forgejoIssueUrl: string | null; + forgejoApiUrl: string | null; + syncStatus: "pending" | "synced" | "failed"; + lastError: string | null; +}; + +export type IssueReservationResult = + | { kind: "reserved" } + | { kind: "existing"; mapping: IssueMappingRecord }; diff --git a/src/webhook-intake.ts b/src/webhook-intake.ts deleted file mode 100644 index d70c2c0..0000000 --- a/src/webhook-intake.ts +++ /dev/null @@ -1,153 +0,0 @@ -import { createHmac, timingSafeEqual } from "node:crypto"; -import type { PluginContext, PluginWebhookInput } from "@paperclipai/plugin-sdk"; -import { buildForgejoSyncContent } from "./attachment-policy.js"; -import { readConfig } from "./config.js"; -import { WEBHOOK_KEYS } from "./constants.js"; -import { enqueueManualReview, recordSyncCandidate, recordWebhookDelivery } from "./persistence.js"; -import type { AttachmentMetadata, NormalizedSyncCandidate } from "./types.js"; - -function firstHeader(headers: Record, key: string): string | null { - const match = Object.entries(headers).find(([headerKey]) => headerKey.toLowerCase() === key.toLowerCase()); - if (!match) return null; - const value = match[1]; - return Array.isArray(value) ? value[0] ?? null : value; -} - -function asRecord(value: unknown): Record | null { - return typeof value === "object" && value !== null && !Array.isArray(value) ? value as Record : null; -} - -function asString(value: unknown): string | null { - return typeof value === "string" && value.trim().length > 0 ? value : null; -} - -function asNumber(value: unknown): number | null { - return typeof value === "number" && Number.isFinite(value) ? value : null; -} - -function readAttachments(payload: Record): AttachmentMetadata[] { - const sources = [ - payload.attachments, - asRecord(payload.comment)?.attachments, - asRecord(payload.issue)?.attachments - ]; - const attachments = sources.find(Array.isArray); - if (!attachments) return []; - - return attachments.map((item) => { - const record = asRecord(item) ?? {}; - return { - filename: asString(record.name) ?? asString(record.filename), - mimeType: asString(record.content_type) ?? asString(record.mimeType), - sizeBytes: asNumber(record.size) ?? asNumber(record.sizeBytes), - sourceUrl: asString(record.browser_download_url) ?? asString(record.url), - sourceId: asString(record.uuid) ?? asString(record.id) - }; - }); -} - -function normalizePayload( - payload: Record, - companyId: string -): NormalizedSyncCandidate { - const issue = asRecord(payload.issue); - const comment = asRecord(payload.comment); - const sourceKind = comment ? "comment" : "issue"; - const sourceRecord = comment ?? issue ?? payload; - const sourceId = asString(sourceRecord.id) ?? asString(sourceRecord.uuid) ?? "unknown-source"; - const body = asString(sourceRecord.body) ?? asString(issue?.body) ?? ""; - const title = sourceKind === "issue" - ? asString(sourceRecord.title) ?? asString(payload.title) - : asString(issue?.title); - const attachments = readAttachments(payload); - const syncContent = buildForgejoSyncContent(body, attachments); - const dedupeKey = [ - asString(payload.repository?.toString?.()) ?? asString(asRecord(payload.repository)?.full_name) ?? "unknown-repo", - sourceKind, - sourceId, - asString(payload.action) ?? "unknown-action" - ].join(":"); - - return { - companyId, - sourceKind, - sourceId, - dedupeKey, - title, - body: syncContent.markdown, - attachmentMetadata: syncContent.attachmentMetadata, - reviewSignal: syncContent.reviewSignal, - rawPayload: payload - }; -} - -async function verifyWebhookSignature(ctx: PluginContext, input: PluginWebhookInput): Promise { - const config = readConfig(await ctx.config.get()); - if (!config.webhookSecretRef) return; - - const signatureHeader = firstHeader(input.headers, "x-hub-signature-256"); - if (!signatureHeader) { - throw new Error("Webhook signature is required when webhookSecretRef is configured."); - } - - const secret = await ctx.secrets.resolve(config.webhookSecretRef); - const expected = `sha256=${createHmac("sha256", secret).update(input.rawBody).digest("hex")}`; - const actualBuffer = Buffer.from(signatureHeader); - const expectedBuffer = Buffer.from(expected); - if (actualBuffer.length !== expectedBuffer.length || !timingSafeEqual(actualBuffer, expectedBuffer)) { - throw new Error("Webhook signature verification failed."); - } -} - -function resolveCompanyId(payload: Record, fallbackCompanyId?: string): string { - const payloadCompanyId = asString(payload.companyId) - ?? asString(asRecord(payload.paperclip)?.companyId) - ?? asString(asRecord(payload.meta)?.companyId); - if (payloadCompanyId) return payloadCompanyId; - if (fallbackCompanyId) return fallbackCompanyId; - throw new Error("Webhook payload does not include companyId and defaultCompanyId is not configured."); -} - -export async function handleForgejoWebhook(ctx: PluginContext, input: PluginWebhookInput): Promise { - if (input.endpointKey !== WEBHOOK_KEYS.forgejo) { - throw new Error(`Unsupported webhook endpoint "${input.endpointKey}"`); - } - - await verifyWebhookSignature(ctx, input); - const payload = asRecord(input.parsedBody) ?? {}; - const config = readConfig(await ctx.config.get()); - const companyId = resolveCompanyId(payload, config.defaultCompanyId); - const eventName = firstHeader(input.headers, "x-gitea-event") - ?? firstHeader(input.headers, "x-forgejo-event") - ?? "unknown"; - const deliveryKey = firstHeader(input.headers, "x-gitea-delivery") - ?? firstHeader(input.headers, "x-forgejo-delivery") - ?? input.requestId; - - await recordWebhookDelivery(ctx, { - requestId: input.requestId, - deliveryKey, - eventName, - companyId, - payload - }); - - const candidate = normalizePayload(payload, companyId); - await recordSyncCandidate(ctx, candidate); - - if (candidate.reviewSignal.manualReviewRequired) { - await enqueueManualReview(ctx, candidate); - await ctx.activity.log({ - companyId, - entityType: "plugin_review", - entityId: `${candidate.sourceKind}:${candidate.sourceId}`, - message: "Queued Forgejo sync payload for manual review because attachment context appears required.", - metadata: { - reasonCode: candidate.reviewSignal.reasonCode, - attachmentCount: candidate.reviewSignal.attachmentCount - } - }); - } - - return candidate; -} diff --git a/src/worker.ts b/src/worker.ts index 448165e..e94fc54 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -1,14 +1,15 @@ -import { definePlugin, runWorker, type PluginContext, type PluginWebhookInput } from "@paperclipai/plugin-sdk"; +import { definePlugin, runWorker, type PluginContext } from "@paperclipai/plugin-sdk"; import { validateConfig } from "./config.js"; +import { registerIssueSyncHandlers } from "./paperclip-issue-sync.js"; import { readReconciliationSnapshot } from "./persistence.js"; import { runReconciliation, registerReconciliationJob } from "./reconciliation.js"; -import { handleForgejoWebhook } from "./webhook-intake.js"; let currentContext: PluginContext | null = null; const plugin = definePlugin({ async setup(ctx) { currentContext = ctx; + registerIssueSyncHandlers(ctx); registerReconciliationJob(ctx); ctx.data.register("sync-health", async () => { const snapshot = await readReconciliationSnapshot(ctx); @@ -37,19 +38,9 @@ const plugin = definePlugin({ currentContext.logger.info("Forgejo sync config changed", { hasForgejoBaseUrl: Boolean(newConfig.forgejoBaseUrl), hasTokenRef: Boolean(newConfig.forgejoTokenRef), - hasWebhookSecretRef: Boolean(newConfig.webhookSecretRef) - }); - }, - - async onWebhook(input: PluginWebhookInput) { - if (!currentContext) { - throw new Error("Plugin context is not ready."); - } - const candidate = await handleForgejoWebhook(currentContext, input); - currentContext.logger.info("Processed Forgejo webhook delivery", { - sourceKind: candidate.sourceKind, - sourceId: candidate.sourceId, - manualReviewRequired: candidate.reviewSignal.manualReviewRequired + forgejoOwner: typeof newConfig.forgejoOwner === "string" ? newConfig.forgejoOwner : null, + forgejoRepo: typeof newConfig.forgejoRepo === "string" ? newConfig.forgejoRepo : null, + syncIssueLabel: typeof newConfig.syncIssueLabel === "string" ? newConfig.syncIssueLabel : null }); }, @@ -64,7 +55,7 @@ const plugin = definePlugin({ const snapshot = await readReconciliationSnapshot(currentContext); return { status: "ok" as const, - message: "Forgejo sync scaffold is ready", + message: "Forgejo issue sync is ready", details: { pendingReviews: snapshot.pendingReviews, pendingDeliveries: snapshot.pendingDeliveries, diff --git a/tests/paperclip-issue-sync.spec.ts b/tests/paperclip-issue-sync.spec.ts new file mode 100644 index 0000000..89f55ed --- /dev/null +++ b/tests/paperclip-issue-sync.spec.ts @@ -0,0 +1,228 @@ +import { describe, expect, it, vi } from "vitest"; +import type { Issue } from "@paperclipai/shared"; +import { ATTACHMENT_NOTE } from "../src/constants.js"; +import { + buildForgejoIssuePayload, + DEFAULT_SYNC_LABEL, + isIssueSelected, + syncIssueToForgejo +} from "../src/paperclip-issue-sync.js"; + +function buildIssue(overrides: Partial = {}): Issue { + return { + id: "issue-1", + companyId: "company-1", + projectId: null, + projectWorkspaceId: null, + goalId: null, + parentId: null, + title: "Fix Forgejo sync", + description: "See attached screenshot for the exact failure.", + status: "in_progress", + workMode: "standard", + priority: "medium", + assigneeAgentId: null, + assigneeUserId: null, + checkoutRunId: null, + executionRunId: null, + executionAgentNameKey: null, + executionLockedAt: null, + createdByAgentId: null, + createdByUserId: null, + issueNumber: 17, + identifier: "PRIA-17", + requestDepth: 0, + billingCode: null, + assigneeAdapterOverrides: null, + executionWorkspaceId: null, + executionWorkspacePreference: null, + executionWorkspaceSettings: null, + startedAt: null, + completedAt: null, + cancelledAt: null, + hiddenAt: null, + labels: [ + { + id: "label-1", + companyId: "company-1", + name: DEFAULT_SYNC_LABEL, + color: "#000000", + createdAt: new Date("2026-06-02T00:00:00Z"), + updatedAt: new Date("2026-06-02T00:00:00Z") + } + ], + blockedBy: [], + blocks: [], + project: { + id: "project-1", + companyId: "company-1", + name: "Forgejo Issue Sync Plugin", + description: null, + status: "planned", + primaryGoalId: null, + createdAt: new Date("2026-06-02T00:00:00Z"), + updatedAt: new Date("2026-06-02T00:00:00Z") + }, + goal: null, + currentExecutionWorkspace: null, + mentionedProjects: [], + myLastTouchAt: null, + lastExternalCommentAt: null, + lastActivityAt: null, + isUnreadForMe: false, + createdAt: new Date("2026-06-02T00:00:00Z"), + updatedAt: new Date("2026-06-02T00:00:00Z"), + ...overrides + } as Issue; +} + +describe("paperclip issue sync", () => { + it("selects issues by the configured sync label", () => { + const issue = buildIssue(); + expect(isIssueSelected(issue, {})).toBe(true); + expect(isIssueSelected(issue, { syncIssueLabel: "custom-sync" })).toBe(false); + }); + + it("builds a Forgejo issue body with metadata-only attachments", () => { + const issue = buildIssue({ + workProducts: [ + { + id: "wp-1", + companyId: "company-1", + projectId: null, + issueId: "issue-1", + executionWorkspaceId: null, + runtimeServiceId: null, + type: "artifact", + provider: "paperclip", + externalId: "attachment-1", + title: "trace.log", + url: "https://paperclip.example/artifacts/trace.log", + status: "ready_for_review", + reviewState: "none", + isPrimary: false, + healthStatus: "healthy", + summary: null, + metadata: { + attachmentId: "attachment-1", + contentType: "text/plain", + byteSize: 2048, + originalFilename: "trace.log" + }, + createdByRunId: null, + createdAt: new Date("2026-06-02T00:00:00Z"), + updatedAt: new Date("2026-06-02T00:00:00Z") + } + ] + } as Partial); + + const payload = buildForgejoIssuePayload(issue); + expect(payload.title).toContain("[PRIA-17]"); + expect(payload.body).toContain(ATTACHMENT_NOTE); + expect(payload.body).toContain("trace.log | text/plain | 2.0 KiB"); + expect(payload.body).toContain(""); + }); + + it("creates a Forgejo issue once and queues manual review when attachment context is required", async () => { + const issue = buildIssue({ + workProducts: [ + { + id: "wp-1", + companyId: "company-1", + projectId: null, + issueId: "issue-1", + executionWorkspaceId: null, + runtimeServiceId: null, + type: "artifact", + provider: "paperclip", + externalId: "attachment-1", + title: "screenshot.png", + url: "https://paperclip.example/artifacts/screenshot.png", + status: "ready_for_review", + reviewState: "none", + isPrimary: false, + healthStatus: "healthy", + summary: null, + metadata: { + attachmentId: "attachment-1", + contentType: "image/png", + byteSize: 1024, + originalFilename: "screenshot.png" + }, + createdByRunId: null, + createdAt: new Date("2026-06-02T00:00:00Z"), + updatedAt: new Date("2026-06-02T00:00:00Z") + } + ] + } as Partial); + const activityLog = vi.fn(async () => undefined); + const reserve = vi.fn(async () => ({ kind: "reserved" as const })); + const createRemoteIssue = vi.fn(async () => ({ + id: 101, + number: 33, + url: "https://forgejo.example/acme/repo/issues/33", + apiUrl: "https://forgejo.example/api/v1/repos/acme/repo/issues/33" + })); + const complete = vi.fn(async () => undefined); + const fail = vi.fn(async () => undefined); + const queueManualReview = vi.fn(async () => undefined); + + const result = await syncIssueToForgejo( + { + config: { get: async () => ({ forgejoOwner: "acme", forgejoRepo: "repo" }) }, + activity: { log: activityLog } + } as never, + issue, + { reserve, createRemoteIssue, complete, fail, queueManualReview } + ); + + expect(result).toBe("created"); + expect(createRemoteIssue).toHaveBeenCalledOnce(); + expect(complete).toHaveBeenCalledWith(expect.anything(), "company-1", "issue-1", expect.objectContaining({ number: 33 })); + expect(queueManualReview).toHaveBeenCalledOnce(); + expect(activityLog).toHaveBeenCalledWith(expect.objectContaining({ + message: "Created Forgejo issue #33 for PRIA-17." + })); + expect(fail).not.toHaveBeenCalled(); + }); + + it("skips remote creation when a synced mapping already exists", async () => { + const issue = buildIssue(); + const createRemoteIssue = vi.fn(); + + const result = await syncIssueToForgejo( + { + config: { get: async () => ({ forgejoOwner: "acme", forgejoRepo: "repo" }) }, + activity: { log: vi.fn(async () => undefined) } + } as never, + issue, + { + reserve: async () => ({ + kind: "existing", + mapping: { + companyId: "company-1", + paperclipIssueId: "issue-1", + repoOwner: "acme", + repoName: "repo", + dedupeKey: "paperclip-issue:company-1:issue-1", + sourceTitle: "[PRIA-17] Fix Forgejo sync", + sourceBody: "body", + attachmentMetadata: [], + manualReviewRequired: false, + reviewReasonCode: null, + forgejoIssueId: 101, + forgejoIssueNumber: 33, + forgejoIssueUrl: "https://forgejo.example/acme/repo/issues/33", + forgejoApiUrl: "https://forgejo.example/api/v1/repos/acme/repo/issues/33", + syncStatus: "synced", + lastError: null + } + }), + createRemoteIssue + } + ); + + expect(result).toBe("existing"); + expect(createRemoteIssue).not.toHaveBeenCalled(); + }); +});