[codex] Add plugin orchestration host APIs (#4114)

## Thinking Path

> - Paperclip orchestrates AI agents for zero-human companies.
> - The plugin system is the extension path for optional capabilities
that should not require core product changes for every integration.
> - Plugins need scoped host APIs for issue orchestration, documents,
wakeups, summaries, activity attribution, and isolated database state.
> - Without those host APIs, richer plugins either cannot coordinate
Paperclip work safely or need privileged core-side special cases.
> - This pull request adds the plugin orchestration host surface, scoped
route dispatch, a database namespace layer, and a smoke plugin that
exercises the contract.
> - The benefit is a broader plugin API that remains company-scoped,
auditable, and covered by tests.

## What Changed

- Added plugin orchestration host APIs for issue creation, document
access, wakeups, summaries, plugin-origin activity, and scoped API route
dispatch.
- Added plugin database namespace tables, schema exports, migration
checks, and idempotent replay coverage under migration
`0059_plugin_database_namespaces`.
- Added shared plugin route/API types and validators used by server and
SDK boundaries.
- Expanded plugin SDK types, protocol helpers, worker RPC host behavior,
and testing utilities for orchestration flows.
- Added the `plugin-orchestration-smoke-example` package to exercise
scoped routes, restricted database namespaces, issue orchestration,
documents, wakeups, summaries, and UI status surfaces.
- Kept the new orchestration smoke fixture out of the root pnpm
workspace importer so this PR preserves the repository policy of not
committing `pnpm-lock.yaml`.
- Updated plugin docs and database docs for the new orchestration and
database namespace surfaces.
- Rebased the branch onto `public-gh/master`, resolved conflicts, and
removed `pnpm-lock.yaml` from the final PR diff.

## Verification

- `pnpm install --frozen-lockfile`
- `pnpm --filter @paperclipai/db typecheck`
- `pnpm exec vitest run packages/db/src/client.test.ts`
- `pnpm exec vitest run server/src/__tests__/plugin-database.test.ts
server/src/__tests__/plugin-orchestration-apis.test.ts
server/src/__tests__/plugin-routes-authz.test.ts
server/src/__tests__/plugin-scoped-api-routes.test.ts
server/src/__tests__/plugin-sdk-orchestration-contract.test.ts`
- From `packages/plugins/examples/plugin-orchestration-smoke-example`:
`pnpm exec vitest run --config ./vitest.config.ts`
- `pnpm --dir
packages/plugins/examples/plugin-orchestration-smoke-example run
typecheck`
- `pnpm --filter @paperclipai/server typecheck`
- PR CI on latest head `293fc67c`: `policy`, `verify`, `e2e`, and
`security/snyk` all passed.

## Risks

- Medium risk: this expands plugin host authority, so route auth,
company scoping, and plugin-origin activity attribution need careful
review.
- Medium risk: database namespace migration behavior must remain
idempotent for environments that may have seen earlier branch versions.
- Medium risk: the orchestration smoke fixture is intentionally excluded
from the root workspace importer to avoid a `pnpm-lock.yaml` PR diff;
direct fixture verification remains listed above.
- Low operational risk from the PR setup itself: the branch is rebased
onto current `master`, the migration is ordered after upstream
`0057`/`0058`, and `pnpm-lock.yaml` is not in the final diff.

> For core feature work, check [`ROADMAP.md`](ROADMAP.md) first and
discuss it in `#dev` before opening the PR. Feature PRs that overlap
with planned core work may need to be redirected — check the roadmap
first. See `CONTRIBUTING.md`.

Roadmap checked: this work aligns with the completed Plugin system
milestone and extends the plugin surface rather than duplicating an
unrelated planned core feature.

## Model Used

- OpenAI Codex, GPT-5-based coding agent in a tool-enabled CLI
environment. Exact hosted model build and context-window size are not
exposed by the runtime; reasoning/tool use were enabled for repository
inspection, editing, testing, git operations, and PR creation.

## Checklist

- [x] I have included a thinking path that traces from project context
to this change
- [x] I have specified the model used (with version and capability
details)
- [x] I have checked ROADMAP.md and confirmed this PR does not duplicate
planned core work
- [x] I have run tests locally and they pass
- [x] I have added or updated tests where applicable
- [x] If this change affects the UI, I have included before/after
screenshots (N/A: no core UI screen change; example plugin UI contract
is covered by tests)
- [x] I have updated relevant documentation to reflect my changes
- [x] I have considered and documented any risks above
- [x] I will address all Greptile and reviewer comments before
requesting merge

---------

Co-authored-by: Paperclip <noreply@paperclip.ing>
This commit is contained in:
Dotta 2026-04-20 08:52:51 -05:00 committed by GitHub
parent 16b2b84d84
commit 9c6f551595
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
53 changed files with 5584 additions and 53 deletions

View file

@ -11,6 +11,20 @@ import type { PluginEventBus } from "./plugin-event-bus.js";
import { instanceSettingsService } from "./instance-settings.js";
const PLUGIN_EVENT_SET: ReadonlySet<string> = new Set(PLUGIN_EVENT_TYPES);
const ACTIVITY_ACTION_TO_PLUGIN_EVENT: Readonly<Record<string, PluginEventType>> = {
issue_comment_added: "issue.comment.created",
issue_comment_created: "issue.comment.created",
issue_document_created: "issue.document.created",
issue_document_updated: "issue.document.updated",
issue_document_deleted: "issue.document.deleted",
issue_blockers_updated: "issue.relations.updated",
approval_approved: "approval.decided",
approval_rejected: "approval.decided",
approval_revision_requested: "approval.decided",
budget_soft_threshold_crossed: "budget.incident.opened",
budget_hard_threshold_crossed: "budget.incident.opened",
budget_incident_resolved: "budget.incident.resolved",
};
let _pluginEventBus: PluginEventBus | null = null;
@ -22,9 +36,23 @@ export function setPluginEventBus(bus: PluginEventBus): void {
_pluginEventBus = bus;
}
function eventTypeForActivityAction(action: string): PluginEventType | null {
if (PLUGIN_EVENT_SET.has(action)) return action as PluginEventType;
return ACTIVITY_ACTION_TO_PLUGIN_EVENT[action.replaceAll(".", "_")] ?? null;
}
export function publishPluginDomainEvent(event: PluginEvent): void {
if (!_pluginEventBus) return;
void _pluginEventBus.emit(event).then(({ errors }) => {
for (const { pluginId, error } of errors) {
logger.warn({ pluginId, eventType: event.eventType, err: error }, "plugin event handler failed");
}
}).catch(() => {});
}
export interface LogActivityInput {
companyId: string;
actorType: "agent" | "user" | "system";
actorType: "agent" | "user" | "system" | "plugin";
actorId: string;
action: string;
entityType: string;
@ -69,10 +97,11 @@ export async function logActivity(db: Db, input: LogActivityInput) {
},
});
if (_pluginEventBus && PLUGIN_EVENT_SET.has(input.action)) {
const pluginEventType = eventTypeForActivityAction(input.action);
if (pluginEventType) {
const event: PluginEvent = {
eventId: randomUUID(),
eventType: input.action as PluginEventType,
eventType: pluginEventType,
occurredAt: new Date().toISOString(),
actorId: input.actorId,
actorType: input.actorType,
@ -85,10 +114,6 @@ export async function logActivity(db: Db, input: LogActivityInput) {
runId: input.runId ?? null,
},
};
void _pluginEventBus.emit(event).then(({ errors }) => {
for (const { pluginId, error } of errors) {
logger.warn({ pluginId, eventType: event.eventType, err: error }, "plugin event handler failed");
}
}).catch(() => {});
publishPluginDomainEvent(event);
}
}

View file

@ -2,6 +2,7 @@ import fs from "node:fs/promises";
import path from "node:path";
import { execFile as execFileCallback } from "node:child_process";
import { promisify } from "node:util";
import { randomUUID } from "node:crypto";
import { and, asc, desc, eq, getTableColumns, gt, inArray, isNull, or, sql } from "drizzle-orm";
import type { Db } from "@paperclipai/db";
import {
@ -60,7 +61,7 @@ import {
classifyRunLiveness,
type RunLivenessClassificationInput,
} from "./run-liveness.js";
import { logActivity, type LogActivityInput } from "./activity-log.js";
import { logActivity, publishPluginDomainEvent, type LogActivityInput } from "./activity-log.js";
import {
buildWorkspaceReadyComment,
cleanupExecutionWorkspaceArtifacts,
@ -2377,11 +2378,50 @@ export function heartbeatService(db: Db) {
finishedAt: updated.finishedAt ? new Date(updated.finishedAt).toISOString() : null,
},
});
publishRunLifecyclePluginEvent(updated);
}
return updated;
}
function publishRunLifecyclePluginEvent(run: typeof heartbeatRuns.$inferSelect) {
const eventType =
run.status === "running"
? "agent.run.started"
: run.status === "succeeded"
? "agent.run.finished"
: run.status === "failed" || run.status === "timed_out"
? "agent.run.failed"
: run.status === "cancelled"
? "agent.run.cancelled"
: null;
if (!eventType) return;
publishPluginDomainEvent({
eventId: randomUUID(),
eventType,
occurredAt: new Date().toISOString(),
actorId: run.agentId,
actorType: "agent",
entityId: run.id,
entityType: "heartbeat_run",
companyId: run.companyId,
payload: {
runId: run.id,
agentId: run.agentId,
status: run.status,
invocationSource: run.invocationSource,
triggerDetail: run.triggerDetail,
error: run.error ?? null,
errorCode: run.errorCode ?? null,
issueId: typeof run.contextSnapshot === "object" && run.contextSnapshot !== null
? (run.contextSnapshot as Record<string, unknown>).issueId ?? null
: null,
startedAt: run.startedAt ? new Date(run.startedAt).toISOString() : null,
finishedAt: run.finishedAt ? new Date(run.finishedAt).toISOString() : null,
},
});
}
async function setWakeupStatus(
wakeupRequestId: string | null | undefined,
status: string,
@ -3054,6 +3094,7 @@ export function heartbeatService(db: Db) {
finishedAt: claimed.finishedAt ? new Date(claimed.finishedAt).toISOString() : null,
},
});
publishRunLifecyclePluginEvent(claimed);
await setWakeupStatus(claimed.wakeupRequestId, "claimed", { claimedAt });

View file

@ -51,6 +51,7 @@ const OPERATION_CAPABILITIES: Record<string, readonly PluginCapability[]> = {
"project.workspaces.get": ["project.workspaces.read"],
"issues.list": ["issues.read"],
"issues.get": ["issues.read"],
"issues.relations.get": ["issue.relations.read"],
"issue.comments.list": ["issue.comments.read"],
"issue.comments.get": ["issue.comments.read"],
"agents.list": ["agents.read"],
@ -61,14 +62,26 @@ const OPERATION_CAPABILITIES: Record<string, readonly PluginCapability[]> = {
"activity.get": ["activity.read"],
"costs.list": ["costs.read"],
"costs.get": ["costs.read"],
"issues.summaries.getOrchestration": ["issues.orchestration.read"],
"db.namespace": ["database.namespace.read"],
"db.query": ["database.namespace.read"],
// Data write operations
"issues.create": ["issues.create"],
"issues.update": ["issues.update"],
"issues.relations.setBlockedBy": ["issue.relations.write"],
"issues.relations.addBlockers": ["issue.relations.write"],
"issues.relations.removeBlockers": ["issue.relations.write"],
"issues.assertCheckoutOwner": ["issues.checkout"],
"issues.getSubtree": ["issue.subtree.read"],
"issues.requestWakeup": ["issues.wakeup"],
"issues.requestWakeups": ["issues.wakeup"],
"issue.comments.create": ["issue.comments.create"],
"activity.log": ["activity.log.write"],
"metrics.write": ["metrics.write"],
"telemetry.track": ["telemetry.track"],
"db.migrate": ["database.namespace.migrate"],
"db.execute": ["database.namespace.write"],
// Plugin state operations
"plugin.state.get": ["plugin.state.read"],
@ -141,6 +154,7 @@ const FEATURE_CAPABILITIES: Record<string, PluginCapability> = {
tools: "agent.tools.register",
jobs: "jobs.schedule",
webhooks: "webhooks.receive",
database: "database.namespace.migrate",
};
// ---------------------------------------------------------------------------

View file

@ -0,0 +1,498 @@
import { createHash } from "node:crypto";
import { readdir, readFile } from "node:fs/promises";
import path from "node:path";
import { and, eq, sql } from "drizzle-orm";
import type { SQL } from "drizzle-orm";
import type { Db } from "@paperclipai/db";
import {
pluginDatabaseNamespaces,
pluginMigrations,
plugins,
} from "@paperclipai/db";
import type {
PaperclipPluginManifestV1,
PluginDatabaseCoreReadTable,
PluginMigrationRecord,
} from "@paperclipai/shared";
const IDENTIFIER_RE = /^[A-Za-z_][A-Za-z0-9_]*$/;
const MAX_POSTGRES_IDENTIFIER_LENGTH = 63;
type SqlRef = { schema: string; table: string; keyword: string };
export type PluginDatabaseRuntimeResult<T = Record<string, unknown>> = {
rows?: T[];
rowCount?: number;
};
export function derivePluginDatabaseNamespace(
pluginKey: string,
namespaceSlug?: string,
): string {
const hash = createHash("sha256").update(pluginKey).digest("hex").slice(0, 10);
const slug = (namespaceSlug ?? pluginKey)
.toLowerCase()
.replace(/[^a-z0-9_]+/g, "_")
.replace(/^_+|_+$/g, "")
.replace(/_+/g, "_")
.slice(0, 36) || "plugin";
const namespace = `plugin_${slug}_${hash}`;
return namespace.slice(0, MAX_POSTGRES_IDENTIFIER_LENGTH);
}
function assertIdentifier(value: string, label = "identifier"): string {
if (!IDENTIFIER_RE.test(value)) {
throw new Error(`Unsafe SQL ${label}: ${value}`);
}
return value;
}
function quoteIdentifier(value: string): string {
return `"${assertIdentifier(value).replaceAll("\"", "\"\"")}"`;
}
function splitSqlStatements(input: string): string[] {
const statements: string[] = [];
let start = 0;
let quote: "'" | "\"" | null = null;
let lineComment = false;
let blockComment = false;
for (let i = 0; i < input.length; i += 1) {
const char = input[i]!;
const next = input[i + 1];
if (lineComment) {
if (char === "\n") lineComment = false;
continue;
}
if (blockComment) {
if (char === "*" && next === "/") {
blockComment = false;
i += 1;
}
continue;
}
if (quote) {
if (char === quote) {
if (next === quote) {
i += 1;
} else {
quote = null;
}
}
continue;
}
if (char === "-" && next === "-") {
lineComment = true;
i += 1;
continue;
}
if (char === "/" && next === "*") {
blockComment = true;
i += 1;
continue;
}
if (char === "'" || char === "\"") {
quote = char;
continue;
}
if (char === ";") {
const statement = input.slice(start, i).trim();
if (statement) statements.push(statement);
start = i + 1;
}
}
const trailing = input.slice(start).trim();
if (trailing) statements.push(trailing);
return statements;
}
function stripSqlForKeywordScan(input: string): string {
return input
.replace(/'([^']|'')*'/g, "''")
.replace(/"([^"]|"")*"/g, "\"\"")
.replace(/--.*$/gm, "")
.replace(/\/\*[\s\S]*?\*\//g, "");
}
function normaliseSql(input: string): string {
return stripSqlForKeywordScan(input).replace(/\s+/g, " ").trim().toLowerCase();
}
function extractQualifiedRefs(statement: string): SqlRef[] {
const refs: SqlRef[] = [];
const patterns = [
/\b(from|join|references|into|update)\s+"?([A-Za-z_][A-Za-z0-9_]*)"?\."?([A-Za-z_][A-Za-z0-9_]*)"?/gi,
/\b(alter\s+table|create\s+table|create\s+view|drop\s+table|truncate\s+table)\s+(?:if\s+(?:not\s+)?exists\s+)?"?([A-Za-z_][A-Za-z0-9_]*)"?\."?([A-Za-z_][A-Za-z0-9_]*)"?/gi,
];
for (const pattern of patterns) {
for (const match of statement.matchAll(pattern)) {
refs.push({ keyword: match[1]!.toLowerCase(), schema: match[2]!, table: match[3]! });
}
}
return refs;
}
function assertAllowedPublicRead(
ref: SqlRef,
allowedCoreReadTables: ReadonlySet<string>,
): void {
if (ref.schema !== "public") return;
if (!allowedCoreReadTables.has(ref.table)) {
throw new Error(`Plugin SQL references public.${ref.table}, which is not whitelisted`);
}
if (!["from", "join", "references"].includes(ref.keyword)) {
throw new Error(`Plugin SQL cannot mutate or define objects in public.${ref.table}`);
}
}
function assertNoBannedSql(statement: string): void {
const normalized = normaliseSql(statement);
const banned = [
/\bcreate\s+extension\b/,
/\bcreate\s+(?:event\s+)?trigger\b/,
/\bcreate\s+(?:or\s+replace\s+)?function\b/,
/\bcreate\s+language\b/,
/\bgrant\b/,
/\brevoke\b/,
/\bsecurity\s+definer\b/,
/\bcopy\b/,
/\bcall\b/,
/\bdo\s+(?:\$\$|language\b)/,
];
const matched = banned.find((pattern) => pattern.test(normalized));
if (matched) {
throw new Error(`Plugin SQL contains a disallowed statement or clause: ${matched.source}`);
}
}
export function validatePluginMigrationStatement(
statement: string,
namespace: string,
coreReadTables: readonly PluginDatabaseCoreReadTable[] = [],
): void {
assertIdentifier(namespace, "namespace");
assertNoBannedSql(statement);
const normalized = normaliseSql(statement);
if (/^\s*(drop|truncate)\b/.test(normalized)) {
throw new Error("Destructive plugin migrations are not allowed in Phase 1");
}
const ddlAllowed = /^(create|alter|comment)\b/.test(normalized);
if (!ddlAllowed) {
throw new Error("Plugin migrations may contain DDL statements only");
}
const refs = extractQualifiedRefs(statement);
if (refs.length === 0 && !normalized.startsWith("comment ")) {
throw new Error("Plugin migration objects must use fully qualified schema names");
}
const allowedCoreReadTables = new Set(coreReadTables);
for (const ref of refs) {
if (ref.schema === namespace) continue;
if (ref.schema === "public") {
assertAllowedPublicRead(ref, allowedCoreReadTables);
continue;
}
throw new Error(`Plugin SQL references schema "${ref.schema}" outside namespace "${namespace}"`);
}
}
export function validatePluginRuntimeQuery(
query: string,
namespace: string,
coreReadTables: readonly PluginDatabaseCoreReadTable[] = [],
): void {
const statements = splitSqlStatements(query);
if (statements.length !== 1) {
throw new Error("Plugin runtime SQL must contain exactly one statement");
}
const statement = statements[0]!;
assertNoBannedSql(statement);
const normalized = normaliseSql(statement);
if (!normalized.startsWith("select ") && !normalized.startsWith("with ")) {
throw new Error("ctx.db.query only allows SELECT statements");
}
if (/\b(insert|update|delete|alter|create|drop|truncate)\b/.test(normalized)) {
throw new Error("ctx.db.query cannot contain mutation or DDL keywords");
}
const allowedCoreReadTables = new Set(coreReadTables);
for (const ref of extractQualifiedRefs(statement)) {
if (ref.schema === namespace) continue;
if (ref.schema === "public") {
assertAllowedPublicRead(ref, allowedCoreReadTables);
continue;
}
throw new Error(`ctx.db.query cannot read schema "${ref.schema}"`);
}
}
export function validatePluginRuntimeExecute(query: string, namespace: string): void {
const statements = splitSqlStatements(query);
if (statements.length !== 1) {
throw new Error("Plugin runtime SQL must contain exactly one statement");
}
const statement = statements[0]!;
assertNoBannedSql(statement);
const normalized = normaliseSql(statement);
if (!/^(insert\s+into|update|delete\s+from)\b/.test(normalized)) {
throw new Error("ctx.db.execute only allows INSERT, UPDATE, or DELETE");
}
if (/\b(alter|create|drop|truncate)\b/.test(normalized)) {
throw new Error("ctx.db.execute cannot contain DDL keywords");
}
const refs = extractQualifiedRefs(statement);
const target = refs.find((ref) => ["into", "update", "from"].includes(ref.keyword));
if (!target || target.schema !== namespace) {
throw new Error(`ctx.db.execute target must be inside plugin namespace "${namespace}"`);
}
for (const ref of refs) {
if (ref.schema !== namespace) {
throw new Error("ctx.db.execute cannot reference public or other non-plugin schemas");
}
}
}
function bindSql(statement: string, params: readonly unknown[] = []): SQL {
// Safe only after callers run the plugin SQL validators above.
if (params.length === 0) return sql.raw(statement);
const chunks: SQL[] = [];
let cursor = 0;
const placeholderPattern = /\$(\d+)/g;
const seen = new Set<number>();
for (const match of statement.matchAll(placeholderPattern)) {
const index = Number(match[1]);
if (!Number.isInteger(index) || index < 1 || index > params.length) {
throw new Error(`SQL placeholder $${match[1]} has no matching parameter`);
}
chunks.push(sql.raw(statement.slice(cursor, match.index)));
chunks.push(sql`${params[index - 1]}`);
seen.add(index);
cursor = match.index! + match[0].length;
}
chunks.push(sql.raw(statement.slice(cursor)));
if (seen.size !== params.length) {
throw new Error("Every ctx.db parameter must be referenced by a $n placeholder");
}
return sql.join(chunks, sql.raw(""));
}
async function listSqlMigrationFiles(migrationsDir: string): Promise<string[]> {
const entries = await readdir(migrationsDir, { withFileTypes: true });
return entries
.filter((entry) => entry.isFile() && entry.name.endsWith(".sql"))
.map((entry) => entry.name)
.sort((a, b) => a.localeCompare(b));
}
function resolveMigrationsDir(packageRoot: string, migrationsDir: string): string {
const resolvedRoot = path.resolve(packageRoot);
const resolvedDir = path.resolve(resolvedRoot, migrationsDir);
const relative = path.relative(resolvedRoot, resolvedDir);
if (relative.startsWith("..") || path.isAbsolute(relative)) {
throw new Error(`Plugin migrationsDir escapes package root: ${migrationsDir}`);
}
return resolvedDir;
}
export function pluginDatabaseService(db: Db) {
async function getPluginRecord(pluginId: string) {
const rows = await db.select().from(plugins).where(eq(plugins.id, pluginId)).limit(1);
const plugin = rows[0];
if (!plugin) throw new Error(`Plugin not found: ${pluginId}`);
return plugin;
}
async function ensureNamespace(pluginId: string, manifest: PaperclipPluginManifestV1) {
if (!manifest.database) return null;
const namespaceName = derivePluginDatabaseNamespace(
manifest.id,
manifest.database.namespaceSlug,
);
await db.execute(sql.raw(`CREATE SCHEMA IF NOT EXISTS ${quoteIdentifier(namespaceName)}`));
const rows = await db
.insert(pluginDatabaseNamespaces)
.values({
pluginId,
pluginKey: manifest.id,
namespaceName,
namespaceMode: "schema",
status: "active",
})
.onConflictDoUpdate({
target: pluginDatabaseNamespaces.pluginId,
set: {
pluginKey: manifest.id,
namespaceName,
namespaceMode: "schema",
status: "active",
updatedAt: new Date(),
},
})
.returning();
return rows[0] ?? null;
}
async function getNamespace(pluginId: string) {
const rows = await db
.select()
.from(pluginDatabaseNamespaces)
.where(eq(pluginDatabaseNamespaces.pluginId, pluginId))
.limit(1);
return rows[0] ?? null;
}
async function getRuntimeNamespace(pluginId: string) {
const namespace = await getNamespace(pluginId);
if (!namespace || namespace.status !== "active") {
throw new Error("Plugin database namespace is not active");
}
return namespace.namespaceName;
}
async function recordMigrationFailure(input: {
pluginId: string;
pluginKey: string;
namespaceName: string;
migrationKey: string;
checksum: string;
pluginVersion: string;
error: unknown;
}): Promise<void> {
const message = input.error instanceof Error ? input.error.message : String(input.error);
await db
.insert(pluginMigrations)
.values({
pluginId: input.pluginId,
pluginKey: input.pluginKey,
namespaceName: input.namespaceName,
migrationKey: input.migrationKey,
checksum: input.checksum,
pluginVersion: input.pluginVersion,
status: "failed",
errorMessage: message,
})
.onConflictDoUpdate({
target: [pluginMigrations.pluginId, pluginMigrations.migrationKey],
set: {
checksum: input.checksum,
pluginVersion: input.pluginVersion,
status: "failed",
errorMessage: message,
startedAt: new Date(),
appliedAt: null,
},
});
await db
.update(pluginDatabaseNamespaces)
.set({ status: "migration_failed", updatedAt: new Date() })
.where(eq(pluginDatabaseNamespaces.pluginId, input.pluginId));
}
return {
ensureNamespace,
async applyMigrations(pluginId: string, manifest: PaperclipPluginManifestV1, packageRoot: string) {
if (!manifest.database) return null;
const namespace = await ensureNamespace(pluginId, manifest);
if (!namespace) return null;
const migrationDir = resolveMigrationsDir(packageRoot, manifest.database.migrationsDir);
const migrationFiles = await listSqlMigrationFiles(migrationDir);
const coreReadTables = manifest.database.coreReadTables ?? [];
const lockKey = Number.parseInt(createHash("sha256").update(pluginId).digest("hex").slice(0, 12), 16);
await db.transaction(async (tx) => {
await tx.execute(sql`SELECT pg_advisory_xact_lock(${lockKey})`);
for (const migrationKey of migrationFiles) {
const content = await readFile(path.join(migrationDir, migrationKey), "utf8");
const checksum = createHash("sha256").update(content).digest("hex");
const existingRows = await tx
.select()
.from(pluginMigrations)
.where(and(eq(pluginMigrations.pluginId, pluginId), eq(pluginMigrations.migrationKey, migrationKey)))
.limit(1);
const existing = existingRows[0] as PluginMigrationRecord | undefined;
if (existing?.status === "applied") {
if (existing.checksum !== checksum) {
throw new Error(`Plugin migration checksum mismatch for ${migrationKey}`);
}
continue;
}
const statements = splitSqlStatements(content);
try {
if (statements.length === 0) {
throw new Error(`Plugin migration ${migrationKey} is empty`);
}
for (const statement of statements) {
validatePluginMigrationStatement(statement, namespace.namespaceName, coreReadTables);
await tx.execute(sql.raw(statement));
}
await tx
.insert(pluginMigrations)
.values({
pluginId,
pluginKey: manifest.id,
namespaceName: namespace.namespaceName,
migrationKey,
checksum,
pluginVersion: manifest.version,
status: "applied",
appliedAt: new Date(),
})
.onConflictDoUpdate({
target: [pluginMigrations.pluginId, pluginMigrations.migrationKey],
set: {
checksum,
pluginVersion: manifest.version,
status: "applied",
errorMessage: null,
startedAt: new Date(),
appliedAt: new Date(),
},
});
} catch (error) {
await recordMigrationFailure({
pluginId,
pluginKey: manifest.id,
namespaceName: namespace.namespaceName,
migrationKey,
checksum,
pluginVersion: manifest.version,
error,
});
throw error;
}
}
});
return namespace;
},
getRuntimeNamespace,
async query<T = Record<string, unknown>>(pluginId: string, statement: string, params?: unknown[]): Promise<T[]> {
const plugin = await getPluginRecord(pluginId);
const namespace = await getRuntimeNamespace(pluginId);
validatePluginRuntimeQuery(statement, namespace, plugin.manifestJson.database?.coreReadTables ?? []);
const result = await db.execute(bindSql(statement, params));
return Array.from(result as Iterable<T>);
},
async execute(pluginId: string, statement: string, params?: unknown[]): Promise<{ rowCount: number }> {
const namespace = await getRuntimeNamespace(pluginId);
validatePluginRuntimeExecute(statement, namespace);
const result = await db.execute(bindSql(statement, params));
return { rowCount: Number((result as { count?: number | string }).count ?? 0) };
},
};
}

View file

@ -1,6 +1,14 @@
import type { Db } from "@paperclipai/db";
import { pluginLogs, agentTaskSessions as agentTaskSessionsTable } from "@paperclipai/db";
import { eq, and, like, desc } from "drizzle-orm";
import {
agentTaskSessions as agentTaskSessionsTable,
agents as agentsTable,
budgetIncidents,
costEvents,
heartbeatRuns,
issues as issuesTable,
pluginLogs,
} from "@paperclipai/db";
import { eq, and, like, desc, inArray, sql } from "drizzle-orm";
import type {
HostServices,
Company,
@ -10,7 +18,10 @@ import type {
Goal,
PluginWorkspace,
IssueComment,
PluginIssueAssigneeSummary,
PluginIssueOrchestrationSummary,
} from "@paperclipai/plugin-sdk";
import type { IssueDocumentSummary } from "@paperclipai/shared";
import { companyService } from "./companies.js";
import { agentService } from "./agents.js";
import { projectService } from "./projects.js";
@ -18,6 +29,8 @@ import { issueService } from "./issues.js";
import { goalService } from "./goals.js";
import { documentService } from "./documents.js";
import { heartbeatService } from "./heartbeat.js";
import { budgetService } from "./budgets.js";
import { issueApprovalService } from "./issue-approvals.js";
import { subscribeCompanyLiveEvents } from "./live-events.js";
import { randomUUID } from "node:crypto";
import { activityService } from "./activity.js";
@ -25,6 +38,7 @@ import { costService } from "./costs.js";
import { assetService } from "./assets.js";
import { pluginRegistryService } from "./plugin-registry.js";
import { pluginStateStore } from "./plugin-state-store.js";
import { pluginDatabaseService } from "./plugin-database.js";
import { createPluginSecretsHandler } from "./plugin-secrets-handler.js";
import { logActivity } from "./activity-log.js";
import type { PluginEventBus } from "./plugin-event-bus.js";
@ -447,6 +461,7 @@ export function buildHostServices(
): HostServices & { dispose(): void } {
const registry = pluginRegistryService(db);
const stateStore = pluginStateStore(db);
const pluginDb = pluginDatabaseService(db);
const secretsHandler = createPluginSecretsHandler({ db, pluginId });
const companies = companyService(db);
const agents = agentService(db);
@ -457,6 +472,8 @@ export function buildHostServices(
const goals = goalService(db);
const activity = activityService(db);
const costs = costService(db);
const budgets = budgetService(db);
const issueApprovals = issueApprovalService(db);
const assets = assetService(db);
const scopedBus = eventBus.forPlugin(pluginKey);
@ -512,6 +529,216 @@ export function buildHostServices(
return record;
};
const pluginActivityDetails = (
details: Record<string, unknown> | null | undefined,
actor?: { actorAgentId?: string | null; actorUserId?: string | null; actorRunId?: string | null },
) => {
const initiatingActorType = actor?.actorAgentId ? "agent" : actor?.actorUserId ? "user" : null;
const initiatingActorId = actor?.actorAgentId ?? actor?.actorUserId ?? null;
return {
...(details ?? {}),
sourcePluginId: pluginId,
sourcePluginKey: pluginKey,
initiatingActorType,
initiatingActorId,
initiatingAgentId: actor?.actorAgentId ?? null,
initiatingUserId: actor?.actorUserId ?? null,
initiatingRunId: actor?.actorRunId ?? null,
pluginId,
pluginKey,
};
};
const defaultPluginOriginKind = `plugin:${pluginKey}`;
const normalizePluginOriginKind = (originKind: unknown = defaultPluginOriginKind) => {
if (originKind == null || originKind === "") return defaultPluginOriginKind;
if (typeof originKind !== "string") {
throw new Error("Plugin issue originKind must be a string");
}
if (originKind === defaultPluginOriginKind || originKind.startsWith(`${defaultPluginOriginKind}:`)) {
return originKind;
}
throw new Error(`Plugin may only use originKind values under ${defaultPluginOriginKind}`);
};
const assertReadableOriginFilter = (originKind: unknown) => {
if (typeof originKind !== "string" || !originKind.startsWith("plugin:")) return;
normalizePluginOriginKind(originKind);
};
const logPluginActivity = async (input: {
companyId: string;
action: string;
entityType: string;
entityId: string;
details?: Record<string, unknown> | null;
actor?: { actorAgentId?: string | null; actorUserId?: string | null; actorRunId?: string | null };
}) => {
await logActivity(db, {
companyId: input.companyId,
actorType: "plugin",
actorId: pluginId,
agentId: input.actor?.actorAgentId ?? null,
runId: input.actor?.actorRunId ?? null,
action: input.action,
entityType: input.entityType,
entityId: input.entityId,
details: pluginActivityDetails(input.details, input.actor),
});
};
const collectIssueSubtreeIds = async (companyId: string, rootIssueId: string) => {
const seen = new Set<string>([rootIssueId]);
let frontier = [rootIssueId];
while (frontier.length > 0) {
const children = await db
.select({ id: issuesTable.id })
.from(issuesTable)
.where(and(eq(issuesTable.companyId, companyId), inArray(issuesTable.parentId, frontier)));
frontier = children.map((child) => child.id).filter((id) => !seen.has(id));
for (const id of frontier) seen.add(id);
}
return [...seen];
};
const getIssueRunSummaries = async (
companyId: string,
issueIds: string[],
options: { activeOnly?: boolean } = {},
) => {
if (issueIds.length === 0) return [];
const issueIdExpr = sql<string | null>`${heartbeatRuns.contextSnapshot} ->> 'issueId'`;
const statusCondition = options.activeOnly
? inArray(heartbeatRuns.status, ["queued", "running"])
: undefined;
const rows = await db
.select({
id: heartbeatRuns.id,
issueId: issueIdExpr,
agentId: heartbeatRuns.agentId,
status: heartbeatRuns.status,
invocationSource: heartbeatRuns.invocationSource,
triggerDetail: heartbeatRuns.triggerDetail,
startedAt: heartbeatRuns.startedAt,
finishedAt: heartbeatRuns.finishedAt,
error: heartbeatRuns.error,
createdAt: heartbeatRuns.createdAt,
})
.from(heartbeatRuns)
.where(and(eq(heartbeatRuns.companyId, companyId), inArray(issueIdExpr, issueIds), statusCondition))
.orderBy(desc(heartbeatRuns.createdAt))
.limit(100);
return rows.map((row) => ({
...row,
startedAt: row.startedAt?.toISOString() ?? null,
finishedAt: row.finishedAt?.toISOString() ?? null,
createdAt: row.createdAt.toISOString(),
}));
};
const setBlockedByWithActivity = async (params: {
issueId: string;
companyId: string;
blockedByIssueIds: string[];
mutation: "set" | "add" | "remove";
actorAgentId?: string | null;
actorUserId?: string | null;
actorRunId?: string | null;
}) => {
const existing = requireInCompany("Issue", await issues.getById(params.issueId), params.companyId);
const previous = await issues.getRelationSummaries(params.issueId);
await issues.update(params.issueId, {
blockedByIssueIds: params.blockedByIssueIds,
actorAgentId: params.actorAgentId ?? null,
actorUserId: params.actorUserId ?? null,
} as any);
const relations = await issues.getRelationSummaries(params.issueId);
await logPluginActivity({
companyId: params.companyId,
action: "issue.relations.updated",
entityType: "issue",
entityId: params.issueId,
actor: {
actorAgentId: params.actorAgentId,
actorUserId: params.actorUserId,
actorRunId: params.actorRunId,
},
details: {
identifier: existing.identifier,
mutation: params.mutation,
blockedByIssueIds: params.blockedByIssueIds,
previousBlockedByIssueIds: previous.blockedBy.map((relation) => relation.id),
},
});
return relations;
};
const getIssueCostSummary = async (
companyId: string,
issueIds: string[],
billingCode?: string | null,
) => {
const scopeConditions = [
issueIds.length > 0 ? inArray(costEvents.issueId, issueIds) : undefined,
billingCode ? eq(costEvents.billingCode, billingCode) : undefined,
].filter((condition): condition is NonNullable<typeof condition> => Boolean(condition));
if (scopeConditions.length === 0) {
return {
costCents: 0,
inputTokens: 0,
cachedInputTokens: 0,
outputTokens: 0,
billingCode: billingCode ?? null,
};
}
const scopeCondition = scopeConditions.length === 1 ? scopeConditions[0]! : and(...scopeConditions);
const [row] = await db
.select({
costCents: sql<number>`coalesce(sum(${costEvents.costCents}), 0)::double precision`,
inputTokens: sql<number>`coalesce(sum(${costEvents.inputTokens}), 0)::double precision`,
cachedInputTokens: sql<number>`coalesce(sum(${costEvents.cachedInputTokens}), 0)::double precision`,
outputTokens: sql<number>`coalesce(sum(${costEvents.outputTokens}), 0)::double precision`,
})
.from(costEvents)
.where(and(eq(costEvents.companyId, companyId), scopeCondition));
return {
costCents: Number(row?.costCents ?? 0),
inputTokens: Number(row?.inputTokens ?? 0),
cachedInputTokens: Number(row?.cachedInputTokens ?? 0),
outputTokens: Number(row?.outputTokens ?? 0),
billingCode: billingCode ?? null,
};
};
const getOpenBudgetIncidents = async (companyId: string) => {
const rows = await db
.select({
id: budgetIncidents.id,
scopeType: budgetIncidents.scopeType,
scopeId: budgetIncidents.scopeId,
metric: budgetIncidents.metric,
windowKind: budgetIncidents.windowKind,
thresholdType: budgetIncidents.thresholdType,
amountLimit: budgetIncidents.amountLimit,
amountObserved: budgetIncidents.amountObserved,
status: budgetIncidents.status,
approvalId: budgetIncidents.approvalId,
createdAt: budgetIncidents.createdAt,
})
.from(budgetIncidents)
.where(and(eq(budgetIncidents.companyId, companyId), eq(budgetIncidents.status, "open")))
.orderBy(desc(budgetIncidents.createdAt));
return rows.map((row) => ({
...row,
createdAt: row.createdAt.toISOString(),
}));
};
return {
config: {
async get() {
@ -544,6 +771,18 @@ export function buildHostServices(
},
},
db: {
async namespace() {
return pluginDb.getRuntimeNamespace(pluginId);
},
async query(params) {
return pluginDb.query(pluginId, params.sql, params.params);
},
async execute(params) {
return pluginDb.execute(pluginId, params.sql, params.params);
},
},
entities: {
async upsert(params) {
return registry.upsertEntity(pluginId, params as any) as any;
@ -604,12 +843,12 @@ export function buildHostServices(
await ensurePluginAvailableForCompany(companyId);
await logActivity(db, {
companyId,
actorType: "system",
actorType: "plugin",
actorId: pluginId,
action: params.message,
entityType: params.entityType ?? "plugin",
entityId: params.entityId ?? pluginId,
details: params.metadata,
details: pluginActivityDetails(params.metadata),
});
},
},
@ -775,6 +1014,7 @@ export function buildHostServices(
async list(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
assertReadableOriginFilter(params.originKind);
return applyWindow((await issues.list(companyId, params as any)) as Issue[], params);
},
async get(params) {
@ -786,13 +1026,456 @@ export function buildHostServices(
async create(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
return (await issues.create(companyId, params as any)) as Issue;
const { actorAgentId, actorUserId, actorRunId, originKind, ...issueInput } = params;
const normalizedOriginKind = normalizePluginOriginKind(originKind);
const issue = (await issues.create(companyId, {
...(issueInput as any),
originKind: normalizedOriginKind,
originId: params.originId ?? null,
originRunId: params.originRunId ?? actorRunId ?? null,
createdByAgentId: actorAgentId ?? null,
createdByUserId: actorUserId ?? null,
})) as Issue;
await logPluginActivity({
companyId,
action: "issue.created",
entityType: "issue",
entityId: issue.id,
actor: { actorAgentId, actorUserId, actorRunId },
details: {
title: issue.title,
identifier: issue.identifier,
originKind: normalizedOriginKind,
originId: issue.originId,
billingCode: issue.billingCode,
blockedByIssueIds: params.blockedByIssueIds ?? [],
},
});
return issue;
},
async update(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
const existing = requireInCompany("Issue", await issues.getById(params.issueId), companyId);
const patch = { ...(params.patch as Record<string, unknown>) };
const actorAgentId = typeof patch.actorAgentId === "string" ? patch.actorAgentId : null;
const actorUserId = typeof patch.actorUserId === "string" ? patch.actorUserId : null;
const actorRunId = typeof patch.actorRunId === "string" ? patch.actorRunId : null;
delete patch.actorAgentId;
delete patch.actorUserId;
delete patch.actorRunId;
if (patch.originKind !== undefined) {
patch.originKind = normalizePluginOriginKind(patch.originKind);
}
const updated = (await issues.update(params.issueId, {
...(patch as any),
actorAgentId,
actorUserId,
})) as Issue;
await logPluginActivity({
companyId,
action: "issue.updated",
entityType: "issue",
entityId: updated.id,
actor: { actorAgentId, actorUserId, actorRunId },
details: {
identifier: updated.identifier,
patch,
_previous: {
status: existing.status,
assigneeAgentId: existing.assigneeAgentId,
assigneeUserId: existing.assigneeUserId,
},
},
});
return updated;
},
async getRelations(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
requireInCompany("Issue", await issues.getById(params.issueId), companyId);
return (await issues.update(params.issueId, params.patch as any)) as Issue;
return await issues.getRelationSummaries(params.issueId);
},
async setBlockedBy(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
return setBlockedByWithActivity({
companyId,
issueId: params.issueId,
blockedByIssueIds: params.blockedByIssueIds,
mutation: "set",
actorAgentId: params.actorAgentId,
actorUserId: params.actorUserId,
actorRunId: params.actorRunId,
});
},
async addBlockers(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
requireInCompany("Issue", await issues.getById(params.issueId), companyId);
const previous = await issues.getRelationSummaries(params.issueId);
const nextBlockedByIssueIds = [
...new Set([
...previous.blockedBy.map((relation) => relation.id),
...params.blockerIssueIds,
]),
];
return setBlockedByWithActivity({
companyId,
issueId: params.issueId,
blockedByIssueIds: nextBlockedByIssueIds,
mutation: "add",
actorAgentId: params.actorAgentId,
actorUserId: params.actorUserId,
actorRunId: params.actorRunId,
});
},
async removeBlockers(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
requireInCompany("Issue", await issues.getById(params.issueId), companyId);
const previous = await issues.getRelationSummaries(params.issueId);
const removals = new Set(params.blockerIssueIds);
const nextBlockedByIssueIds = previous.blockedBy
.map((relation) => relation.id)
.filter((issueId) => !removals.has(issueId));
return setBlockedByWithActivity({
companyId,
issueId: params.issueId,
blockedByIssueIds: nextBlockedByIssueIds,
mutation: "remove",
actorAgentId: params.actorAgentId,
actorUserId: params.actorUserId,
actorRunId: params.actorRunId,
});
},
async assertCheckoutOwner(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
requireInCompany("Issue", await issues.getById(params.issueId), companyId);
const ownership = await issues.assertCheckoutOwner(
params.issueId,
params.actorAgentId,
params.actorRunId,
);
if (ownership.adoptedFromRunId) {
await logPluginActivity({
companyId,
action: "issue.checkout_lock_adopted",
entityType: "issue",
entityId: params.issueId,
actor: {
actorAgentId: params.actorAgentId,
actorRunId: params.actorRunId,
},
details: {
previousCheckoutRunId: ownership.adoptedFromRunId,
checkoutRunId: params.actorRunId,
reason: "stale_checkout_run",
},
});
}
return {
issueId: ownership.id,
status: ownership.status as Issue["status"],
assigneeAgentId: ownership.assigneeAgentId,
checkoutRunId: ownership.checkoutRunId,
adoptedFromRunId: ownership.adoptedFromRunId,
};
},
async getSubtree(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
const rootIssue = requireInCompany("Issue", await issues.getById(params.issueId), companyId);
const includeRoot = params.includeRoot !== false;
const subtreeIssueIds = await collectIssueSubtreeIds(companyId, rootIssue.id);
const issueIds = includeRoot ? subtreeIssueIds : subtreeIssueIds.filter((issueId) => issueId !== rootIssue.id);
const issueRows = issueIds.length > 0
? await db
.select()
.from(issuesTable)
.where(and(eq(issuesTable.companyId, companyId), inArray(issuesTable.id, issueIds)))
: [];
const issuesById = new Map(issueRows.map((issue) => [issue.id, issue as Issue]));
const outputIssues = issueIds
.map((issueId) => issuesById.get(issueId))
.filter((issue): issue is Issue => Boolean(issue));
const assigneeAgentIds = [
...new Set(outputIssues.map((issue) => issue.assigneeAgentId).filter((id): id is string => Boolean(id))),
];
const [relationPairs, documentPairs, activeRunRows, assigneeRows] = await Promise.all([
params.includeRelations
? Promise.all(issueIds.map(async (issueId) => [issueId, await issues.getRelationSummaries(issueId)] as const))
: Promise.resolve(null),
params.includeDocuments
? Promise.all(
issueIds.map(async (issueId) => {
const docs = await documents.listIssueDocuments(issueId);
const summaries: IssueDocumentSummary[] = docs.map((document) => {
const { body: _body, ...summary } = document as typeof document & { body?: string };
return { ...summary, format: "markdown" as const };
});
return [
issueId,
summaries,
] as const;
}),
)
: Promise.resolve(null),
params.includeActiveRuns
? getIssueRunSummaries(companyId, issueIds, { activeOnly: true })
: Promise.resolve(null),
params.includeAssignees && assigneeAgentIds.length > 0
? db
.select({
id: agentsTable.id,
name: agentsTable.name,
role: agentsTable.role,
title: agentsTable.title,
status: agentsTable.status,
})
.from(agentsTable)
.where(and(eq(agentsTable.companyId, companyId), inArray(agentsTable.id, assigneeAgentIds)))
: Promise.resolve(params.includeAssignees ? [] : null),
]);
const activeRuns = activeRunRows
? Object.fromEntries(issueIds.map((issueId) => [
issueId,
activeRunRows.filter((run) => run.issueId === issueId),
]))
: undefined;
return {
rootIssueId: rootIssue.id,
companyId,
issueIds,
issues: outputIssues,
...(relationPairs ? { relations: Object.fromEntries(relationPairs) } : {}),
...(documentPairs ? { documents: Object.fromEntries(documentPairs) } : {}),
...(activeRuns ? { activeRuns } : {}),
...(assigneeRows
? {
assignees: Object.fromEntries(assigneeRows.map((agent) => [
agent.id,
{ ...agent, status: agent.status as Agent["status"] } as PluginIssueAssigneeSummary,
])),
}
: {}),
};
},
async requestWakeup(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
const issue = requireInCompany("Issue", await issues.getById(params.issueId), companyId);
if (!issue.assigneeAgentId) {
throw new Error("Issue has no assigned agent to wake");
}
if (["backlog", "done", "cancelled"].includes(issue.status)) {
throw new Error(`Issue is not wakeable in status: ${issue.status}`);
}
const relations = await issues.getRelationSummaries(issue.id);
const unresolvedBlockers = relations.blockedBy.filter((blocker) => blocker.status !== "done");
if (unresolvedBlockers.length > 0) {
throw new Error("Issue is blocked by unresolved blockers");
}
const budgetBlock = await budgets.getInvocationBlock(companyId, issue.assigneeAgentId, {
issueId: issue.id,
projectId: issue.projectId,
});
if (budgetBlock) {
throw new Error(budgetBlock.reason);
}
const contextSource = params.contextSource ?? "plugin.issue.requestWakeup";
const run = await heartbeat.wakeup(issue.assigneeAgentId, {
source: "assignment",
triggerDetail: "system",
reason: params.reason ?? "plugin_issue_wakeup_requested",
payload: {
issueId: issue.id,
mutation: "plugin_wakeup",
pluginId,
pluginKey,
contextSource,
},
idempotencyKey: params.idempotencyKey ?? null,
requestedByActorType: "system",
requestedByActorId: pluginId,
contextSnapshot: {
issueId: issue.id,
taskId: issue.id,
wakeReason: params.reason ?? "plugin_issue_wakeup_requested",
source: contextSource,
pluginId,
pluginKey,
},
});
await logPluginActivity({
companyId,
action: "issue.assignment_wakeup_requested",
entityType: "issue",
entityId: issue.id,
actor: {
actorAgentId: params.actorAgentId,
actorUserId: params.actorUserId,
actorRunId: params.actorRunId,
},
details: {
identifier: issue.identifier,
assigneeAgentId: issue.assigneeAgentId,
runId: run?.id ?? null,
reason: params.reason ?? "plugin_issue_wakeup_requested",
contextSource,
},
});
return { queued: Boolean(run), runId: run?.id ?? null };
},
async requestWakeups(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
const results = [];
for (const issueId of [...new Set(params.issueIds)]) {
const issue = requireInCompany("Issue", await issues.getById(issueId), companyId);
if (!issue.assigneeAgentId) {
throw new Error("Issue has no assigned agent to wake");
}
if (["backlog", "done", "cancelled"].includes(issue.status)) {
throw new Error(`Issue is not wakeable in status: ${issue.status}`);
}
const relations = await issues.getRelationSummaries(issue.id);
const unresolvedBlockers = relations.blockedBy.filter((blocker) => blocker.status !== "done");
if (unresolvedBlockers.length > 0) {
throw new Error("Issue is blocked by unresolved blockers");
}
const budgetBlock = await budgets.getInvocationBlock(companyId, issue.assigneeAgentId, {
issueId: issue.id,
projectId: issue.projectId,
});
if (budgetBlock) {
throw new Error(budgetBlock.reason);
}
const contextSource = params.contextSource ?? "plugin.issue.requestWakeups";
const run = await heartbeat.wakeup(issue.assigneeAgentId, {
source: "assignment",
triggerDetail: "system",
reason: params.reason ?? "plugin_issue_wakeup_requested",
payload: {
issueId: issue.id,
mutation: "plugin_wakeup",
pluginId,
pluginKey,
contextSource,
},
idempotencyKey: params.idempotencyKeyPrefix ? `${params.idempotencyKeyPrefix}:${issue.id}` : null,
requestedByActorType: "system",
requestedByActorId: pluginId,
contextSnapshot: {
issueId: issue.id,
taskId: issue.id,
wakeReason: params.reason ?? "plugin_issue_wakeup_requested",
source: contextSource,
pluginId,
pluginKey,
},
});
await logPluginActivity({
companyId,
action: "issue.assignment_wakeup_requested",
entityType: "issue",
entityId: issue.id,
actor: {
actorAgentId: params.actorAgentId,
actorUserId: params.actorUserId,
actorRunId: params.actorRunId,
},
details: {
identifier: issue.identifier,
assigneeAgentId: issue.assigneeAgentId,
runId: run?.id ?? null,
reason: params.reason ?? "plugin_issue_wakeup_requested",
contextSource,
},
});
results.push({ issueId: issue.id, queued: Boolean(run), runId: run?.id ?? null });
}
return results;
},
async getOrchestrationSummary(params): Promise<PluginIssueOrchestrationSummary> {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
const rootIssue = requireInCompany("Issue", await issues.getById(params.issueId), companyId);
const subtreeIssueIds = params.includeSubtree
? await collectIssueSubtreeIds(companyId, rootIssue.id)
: [rootIssue.id];
const relationPairs = await Promise.all(
subtreeIssueIds.map(async (issueId) => [issueId, await issues.getRelationSummaries(issueId)] as const),
);
const approvalRows = (
await Promise.all(
subtreeIssueIds.map(async (issueId) => {
const rows = await issueApprovals.listApprovalsForIssue(issueId);
return rows.map((approval) => ({
issueId,
id: approval.id,
type: approval.type,
status: approval.status,
requestedByAgentId: approval.requestedByAgentId,
requestedByUserId: approval.requestedByUserId,
decidedByUserId: approval.decidedByUserId,
decidedAt: approval.decidedAt?.toISOString() ?? null,
createdAt: approval.createdAt.toISOString(),
}));
}),
)
).flat();
const [runs, costsSummary, openBudgetIncidents] = await Promise.all([
getIssueRunSummaries(companyId, subtreeIssueIds),
getIssueCostSummary(companyId, subtreeIssueIds, params.billingCode ?? rootIssue.billingCode ?? null),
getOpenBudgetIncidents(companyId),
]);
const issueRows = await db
.select({
id: issuesTable.id,
assigneeAgentId: issuesTable.assigneeAgentId,
projectId: issuesTable.projectId,
})
.from(issuesTable)
.where(and(eq(issuesTable.companyId, companyId), inArray(issuesTable.id, subtreeIssueIds)));
const invocationBlocks = (
await Promise.all(
issueRows
.filter((issueRow) => issueRow.assigneeAgentId)
.map(async (issueRow) => {
const block = await budgets.getInvocationBlock(companyId, issueRow.assigneeAgentId!, {
issueId: issueRow.id,
projectId: issueRow.projectId,
});
return block
? {
issueId: issueRow.id,
agentId: issueRow.assigneeAgentId!,
scopeType: block.scopeType,
scopeId: block.scopeId,
scopeName: block.scopeName,
reason: block.reason,
}
: null;
}),
)
).filter((block): block is NonNullable<typeof block> => block !== null);
return {
issueId: rootIssue.id,
companyId,
subtreeIssueIds,
relations: Object.fromEntries(relationPairs),
approvals: approvalRows,
runs,
costs: costsSummary,
openBudgetIncidents,
invocationBlocks,
};
},
async listComments(params) {
const companyId = ensureCompanyId(params.companyId);
@ -803,12 +1486,25 @@ export function buildHostServices(
async createComment(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
requireInCompany("Issue", await issues.getById(params.issueId), companyId);
return (await issues.addComment(
const issue = requireInCompany("Issue", await issues.getById(params.issueId), companyId);
const comment = (await issues.addComment(
params.issueId,
params.body,
{ agentId: params.authorAgentId },
)) as IssueComment;
await logPluginActivity({
companyId,
action: "issue.comment.created",
entityType: "issue",
entityId: issue.id,
actor: { actorAgentId: params.authorAgentId ?? null },
details: {
identifier: issue.identifier,
commentId: comment.id,
bodySnippet: comment.body.slice(0, 120),
},
});
return comment;
},
},
@ -830,7 +1526,7 @@ export function buildHostServices(
async upsert(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
requireInCompany("Issue", await issues.getById(params.issueId), companyId);
const issue = requireInCompany("Issue", await issues.getById(params.issueId), companyId);
const result = await documents.upsertIssueDocument({
issueId: params.issueId,
key: params.key,
@ -839,13 +1535,35 @@ export function buildHostServices(
format: params.format ?? "markdown",
changeSummary: params.changeSummary ?? null,
});
await logPluginActivity({
companyId,
action: "issue.document_upserted",
entityType: "issue",
entityId: issue.id,
details: {
identifier: issue.identifier,
documentKey: params.key,
title: params.title ?? null,
format: params.format ?? "markdown",
},
});
return result.document as any;
},
async delete(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
requireInCompany("Issue", await issues.getById(params.issueId), companyId);
const issue = requireInCompany("Issue", await issues.getById(params.issueId), companyId);
await documents.deleteIssueDocument(params.issueId, params.key);
await logPluginActivity({
companyId,
action: "issue.document_deleted",
entityType: "issue",
entityId: issue.id,
details: {
identifier: issue.identifier,
documentKey: params.key,
},
});
},
},

View file

@ -48,6 +48,7 @@ import type { PluginJobScheduler } from "./plugin-job-scheduler.js";
import type { PluginJobStore } from "./plugin-job-store.js";
import type { PluginToolDispatcher } from "./plugin-tool-dispatcher.js";
import type { PluginLifecycleManager } from "./plugin-lifecycle.js";
import { pluginDatabaseService } from "./plugin-database.js";
const execFileAsync = promisify(execFile);
const __dirname = path.dirname(fileURLToPath(import.meta.url));
@ -147,6 +148,9 @@ export interface PluginLoaderOptions {
*/
localPluginDir?: string;
/** Optional direct Postgres connection used for plugin DDL migrations. */
migrationDb?: Db;
/**
* Whether to scan the local filesystem directory for plugins.
* Defaults to true.
@ -735,6 +739,7 @@ export function pluginLoader(
): PluginLoader {
const {
localPluginDir = DEFAULT_LOCAL_PLUGIN_DIR,
migrationDb = db,
enableLocalFilesystem = true,
enableNpmDiscovery = true,
} = options;
@ -1701,14 +1706,22 @@ export function pluginLoader(
// 1. Resolve worker entrypoint
// ------------------------------------------------------------------
const workerEntrypoint = resolveWorkerEntrypoint(plugin, localPluginDir);
const packageRoot = resolvePluginPackageRoot(plugin, localPluginDir);
// ------------------------------------------------------------------
// 2. Build host handlers for this plugin
// 2. Apply restricted database migrations before worker startup
// ------------------------------------------------------------------
const databaseNamespace = manifest.database
? (await pluginDatabaseService(migrationDb).applyMigrations(pluginId, manifest, packageRoot))?.namespaceName ?? null
: null;
// ------------------------------------------------------------------
// 3. Build host handlers for this plugin
// ------------------------------------------------------------------
const hostHandlers = buildHostHandlers(pluginId, manifest);
// ------------------------------------------------------------------
// 3. Retrieve plugin config (if any)
// 4. Retrieve plugin config (if any)
// ------------------------------------------------------------------
let config: Record<string, unknown> = {};
try {
@ -1722,7 +1735,7 @@ export function pluginLoader(
}
// ------------------------------------------------------------------
// 4. Spawn worker process
// 5. Spawn worker process
// ------------------------------------------------------------------
const workerOptions: WorkerStartOptions = {
entrypointPath: workerEntrypoint,
@ -1730,6 +1743,7 @@ export function pluginLoader(
config,
instanceInfo,
apiVersion: manifest.apiVersion,
databaseNamespace,
hostHandlers,
autoRestart: true,
};
@ -1750,7 +1764,7 @@ export function pluginLoader(
);
// ------------------------------------------------------------------
// 5. Sync job declarations and register with scheduler
// 6. Sync job declarations and register with scheduler
// ------------------------------------------------------------------
const jobDeclarations = manifest.jobs ?? [];
if (jobDeclarations.length > 0) {
@ -1939,6 +1953,26 @@ function resolveWorkerEntrypoint(
);
}
function resolvePluginPackageRoot(
plugin: PluginRecord & { packagePath?: string | null },
localPluginDir: string,
): string {
if (plugin.packagePath && existsSync(plugin.packagePath)) {
return path.resolve(plugin.packagePath);
}
const packageName = plugin.packageName;
const packageDir = packageName.startsWith("@")
? path.join(localPluginDir, "node_modules", ...packageName.split("/"))
: path.join(localPluginDir, "node_modules", packageName);
if (existsSync(packageDir)) return packageDir;
const directDir = path.join(localPluginDir, packageName);
if (existsSync(directDir)) return directDir;
throw new Error(`Package root not found for plugin "${plugin.pluginKey}"`);
}
function resolveManagedInstallPackageDir(localPluginDir: string, packageName: string): string {
if (packageName.startsWith("@")) {
return path.join(localPluginDir, "node_modules", ...packageName.split("/"));

View file

@ -166,6 +166,8 @@ export interface WorkerStartOptions {
};
/** Host API version. */
apiVersion: number;
/** Host-derived plugin database namespace, when declared. */
databaseNamespace?: string | null;
/** Handlers for worker→host RPC calls. */
hostHandlers: WorkerToHostHandlers;
/** Default timeout for RPC calls (ms). Defaults to 30s. */
@ -828,6 +830,7 @@ export function createPluginWorkerHandle(
config: options.config,
instanceInfo: options.instanceInfo,
apiVersion: options.apiVersion,
databaseNamespace: options.databaseNamespace ?? null,
};
try {