mirror of
https://github.com/alkimake/paperclip.git
synced 2026-06-16 10:50:38 +09:00
[codex] Runtime control-plane fixes (#6380)
## Thinking Path > - Paperclip orchestrates AI agents through a server-side control plane > - That control plane depends on reliable issue state transitions, plugin lifecycle behavior, import limits, and startup/shutdown handling > - Several small runtime fixes had accumulated on the working branch and were mixed with larger feature work > - Keeping them separate makes the correctness fixes reviewable and mergeable without waiting for cloud-sync UI work > - This pull request groups the server/runtime control-plane fixes into one standalone branch > - The benefit is a tighter, safer runtime baseline for retries, imports, plugin migrations, feedback flushing, and trusted cloud import handling ## What Changed - Fixed updated issue list pagination sorting and scheduled retry comment handling. - Re-applied pending plugin migrations during hot reload and fixed plugin-schema worktree seed restore. - Hardened public tenant DB startup, portable import body limits, trusted cloud import errors, and trusted cloud tenant import mutation access. - Expired stale request confirmations after user comments. - Added feedback export shutdown hardening so database-unavailable flush loops stop cleanly. - Guarded plugin worker `error` event emission when no listener is registered. ## Verification - `pnpm install --frozen-lockfile --ignore-scripts` - `pnpm --filter @paperclipai/plugin-sdk build` - `npm run install --prefix node_modules/.pnpm/sqlite3@5.1.7/node_modules/sqlite3` - `pnpm exec vitest run server/src/__tests__/issues-service.test.ts server/src/__tests__/plugin-lifecycle-restart.test.ts server/src/__tests__/server-startup-feedback-export.test.ts server/src/__tests__/issue-comment-reopen-routes.test.ts server/src/__tests__/issue-thread-interactions-service.test.ts server/src/__tests__/issue-thread-interaction-routes.test.ts server/src/__tests__/body-limits.test.ts server/src/__tests__/feedback-flush-controller.test.ts server/src/__tests__/error-handler.test.ts server/src/__tests__/board-mutation-guard.test.ts packages/db/src/backup-lib.test.ts` initially exposed local setup issues and two 5s test timeouts. - Rerun after local prereq build: `pnpm exec vitest run --testTimeout 15000 server/src/__tests__/issue-comment-reopen-routes.test.ts server/src/__tests__/issue-thread-interaction-routes.test.ts server/src/__tests__/feedback-flush-controller.test.ts server/src/__tests__/server-startup-feedback-export.test.ts` passed. - Some embedded Postgres-backed tests skipped on this host because local Postgres init was unavailable. ## Risks - Runtime-touching branch: startup/shutdown and issue interaction behavior should be reviewed carefully. - The feedback export change disables repeated flush attempts only for database connection-refused failures; other upload failures still log normally. - The plugin worker error guard avoids process crashes from unhandled EventEmitter errors but may hide errors from code paths that expected an emitted listener. > For core feature work, check [`ROADMAP.md`](ROADMAP.md) first and discuss it in `#dev` before opening the PR. Feature PRs that overlap with planned core work may need to be redirected — check the roadmap first. See `CONTRIBUTING.md`. ## Model Used - OpenAI Codex, GPT-5-based coding agent with local shell/git/tool use. Exact hosted model ID and context-window size are not exposed by the local Paperclip adapter runtime. ## Checklist - [x] I have included a thinking path that traces from project context to this change - [x] I have specified the model used (with version and capability details) - [x] I have checked ROADMAP.md and confirmed this PR does not duplicate planned core work - [x] I have run tests locally and they pass - [x] I have added or updated tests where applicable - [x] If this change affects the UI, I have included before/after screenshots - [x] I have updated relevant documentation to reflect my changes - [x] I have considered and documented any risks above - [x] I will address all Greptile and reviewer comments before requesting merge --------- Co-authored-by: Paperclip <noreply@paperclip.ing>
This commit is contained in:
parent
f257530537
commit
c91a062326
26 changed files with 1363 additions and 130 deletions
|
|
@ -1,5 +1,5 @@
|
|||
import { isDeepStrictEqual } from "node:util";
|
||||
import { and, asc, eq, inArray } from "drizzle-orm";
|
||||
import { and, asc, eq, inArray, isNotNull } from "drizzle-orm";
|
||||
import type { Db } from "@paperclipai/db";
|
||||
import {
|
||||
documents,
|
||||
|
|
@ -158,6 +158,20 @@ function shouldReturnAcceptedConfirmationToCreatorAgent(args: {
|
|||
return true;
|
||||
}
|
||||
|
||||
function shouldSupersedeRequestConfirmationOnUserComment(interaction: RequestConfirmationInteraction) {
|
||||
return interaction.payload.supersedeOnUserComment === true;
|
||||
}
|
||||
|
||||
function isCommentAtOrAfterInteraction(args: {
|
||||
commentCreatedAt: Date | string;
|
||||
interactionCreatedAt: Date | string;
|
||||
}) {
|
||||
const commentCreatedAtMs = new Date(args.commentCreatedAt).getTime();
|
||||
const interactionCreatedAtMs = new Date(args.interactionCreatedAt).getTime();
|
||||
if (!Number.isFinite(commentCreatedAtMs) || !Number.isFinite(interactionCreatedAtMs)) return false;
|
||||
return commentCreatedAtMs >= interactionCreatedAtMs;
|
||||
}
|
||||
|
||||
function buildTaskCreationOrder(tasks: ReadonlyArray<SuggestTasksInteraction["payload"]["tasks"][number]>) {
|
||||
const taskByClientKey = new Map(tasks.map((task) => [task.clientKey, task] as const));
|
||||
const ordered: Array<SuggestTasksInteraction["payload"]["tasks"][number]> = [];
|
||||
|
|
@ -967,7 +981,7 @@ export function issueThreadInteractionService(db: Db) {
|
|||
|
||||
expireRequestConfirmationsSupersededByComment: async (
|
||||
issue: { id: string; companyId: string },
|
||||
comment: { id: string; authorUserId?: string | null },
|
||||
comment: { id: string; createdAt: Date | string; authorUserId?: string | null },
|
||||
actor: InteractionActor,
|
||||
) => {
|
||||
if (!comment.authorUserId) return [];
|
||||
|
|
@ -984,7 +998,13 @@ export function issueThreadInteractionService(db: Db) {
|
|||
|
||||
const superseded = rows.filter((row) => {
|
||||
const interaction = hydrateInteraction(row) as RequestConfirmationInteraction;
|
||||
return interaction.payload.supersedeOnUserComment === true;
|
||||
return (
|
||||
shouldSupersedeRequestConfirmationOnUserComment(interaction)
|
||||
&& isCommentAtOrAfterInteraction({
|
||||
commentCreatedAt: comment.createdAt,
|
||||
interactionCreatedAt: row.createdAt,
|
||||
})
|
||||
);
|
||||
});
|
||||
|
||||
if (superseded.length === 0) return [];
|
||||
|
|
@ -1020,6 +1040,91 @@ export function issueThreadInteractionService(db: Db) {
|
|||
return expired;
|
||||
},
|
||||
|
||||
expireRequestConfirmationsSupersededByHistoricalComments: async (
|
||||
issue: { id: string; companyId: string },
|
||||
) => {
|
||||
const [rows, comments] = await Promise.all([
|
||||
db
|
||||
.select()
|
||||
.from(issueThreadInteractions)
|
||||
.where(and(
|
||||
eq(issueThreadInteractions.companyId, issue.companyId),
|
||||
eq(issueThreadInteractions.issueId, issue.id),
|
||||
eq(issueThreadInteractions.kind, "request_confirmation"),
|
||||
eq(issueThreadInteractions.status, "pending"),
|
||||
)),
|
||||
db
|
||||
.select()
|
||||
.from(issueComments)
|
||||
.where(and(
|
||||
eq(issueComments.companyId, issue.companyId),
|
||||
eq(issueComments.issueId, issue.id),
|
||||
isNotNull(issueComments.authorUserId),
|
||||
))
|
||||
.orderBy(asc(issueComments.createdAt)),
|
||||
]);
|
||||
|
||||
if (rows.length === 0 || comments.length === 0) return [];
|
||||
|
||||
const now = new Date();
|
||||
const expired: IssueThreadInteraction[] = [];
|
||||
const supersededByComment = new Map<
|
||||
string,
|
||||
{
|
||||
comment: (typeof comments)[number];
|
||||
rowIds: string[];
|
||||
}
|
||||
>();
|
||||
for (const row of rows) {
|
||||
const interaction = hydrateInteraction(row) as RequestConfirmationInteraction;
|
||||
if (!shouldSupersedeRequestConfirmationOnUserComment(interaction)) continue;
|
||||
|
||||
const supersedingComment = comments.find((comment) => isCommentAtOrAfterInteraction({
|
||||
commentCreatedAt: comment.createdAt,
|
||||
interactionCreatedAt: row.createdAt,
|
||||
}));
|
||||
if (!supersedingComment) continue;
|
||||
|
||||
const group = supersededByComment.get(supersedingComment.id);
|
||||
if (group) {
|
||||
group.rowIds.push(row.id);
|
||||
} else {
|
||||
supersededByComment.set(supersedingComment.id, {
|
||||
comment: supersedingComment,
|
||||
rowIds: [row.id],
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
for (const { comment, rowIds } of supersededByComment.values()) {
|
||||
const updatedRows = await db
|
||||
.update(issueThreadInteractions)
|
||||
.set({
|
||||
status: "expired",
|
||||
result: {
|
||||
version: 1,
|
||||
outcome: "superseded_by_comment",
|
||||
commentId: comment.id,
|
||||
},
|
||||
resolvedByAgentId: null,
|
||||
resolvedByUserId: comment.authorUserId,
|
||||
resolvedAt: now,
|
||||
updatedAt: now,
|
||||
})
|
||||
.where(and(
|
||||
inArray(issueThreadInteractions.id, rowIds),
|
||||
eq(issueThreadInteractions.status, "pending"),
|
||||
))
|
||||
.returning();
|
||||
expired.push(...updatedRows.map(hydrateInteraction));
|
||||
}
|
||||
|
||||
if (expired.length > 0) {
|
||||
await touchIssue(db, issue.id);
|
||||
}
|
||||
return expired;
|
||||
},
|
||||
|
||||
expireStaleRequestConfirmationsForIssueDocument: async (
|
||||
issue: { id: string; companyId: string },
|
||||
document: { id: string; key: string; latestRevisionId?: string | null; latestRevisionNumber?: number | null } | null,
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
import { Buffer } from "node:buffer";
|
||||
import { and, asc, desc, eq, gt, inArray, isNull, like, lt, ne, notInArray, or, sql } from "drizzle-orm";
|
||||
import { and, asc, desc, eq, gt, inArray, isNull, like, lt, ne, notInArray, or, sql, type SQL } from "drizzle-orm";
|
||||
import type { Db } from "@paperclipai/db";
|
||||
import {
|
||||
activityLog,
|
||||
|
|
@ -239,6 +239,8 @@ export interface IssueFilters {
|
|||
q?: string;
|
||||
limit?: number;
|
||||
offset?: number;
|
||||
sortField?: "updated";
|
||||
sortDir?: "asc" | "desc";
|
||||
}
|
||||
|
||||
type IssueRow = typeof issues.$inferSelect;
|
||||
|
|
@ -782,6 +784,43 @@ function latestIssueActivityAt(...values: Array<Date | string | null | undefined
|
|||
return normalized[0] ?? null;
|
||||
}
|
||||
|
||||
function issueListOrderBy(
|
||||
companyId: string,
|
||||
{
|
||||
hasSearch,
|
||||
priorityOrder,
|
||||
searchOrder,
|
||||
sortField,
|
||||
sortDir,
|
||||
}: {
|
||||
hasSearch: boolean;
|
||||
priorityOrder: SQL;
|
||||
searchOrder: SQL;
|
||||
sortField?: IssueFilters["sortField"];
|
||||
sortDir?: IssueFilters["sortDir"];
|
||||
},
|
||||
) {
|
||||
const canonicalLastActivityAt = issueCanonicalLastActivityAtExpr(companyId);
|
||||
if (sortField === "updated") {
|
||||
const activityOrder = sortDir === "asc"
|
||||
? asc(canonicalLastActivityAt)
|
||||
: desc(canonicalLastActivityAt);
|
||||
const updatedOrder = sortDir === "asc" ? asc(issues.updatedAt) : desc(issues.updatedAt);
|
||||
const idOrder = sortDir === "asc" ? asc(issues.id) : desc(issues.id);
|
||||
return hasSearch
|
||||
? [asc(searchOrder), activityOrder, updatedOrder, idOrder]
|
||||
: [activityOrder, updatedOrder, idOrder];
|
||||
}
|
||||
|
||||
return [
|
||||
hasSearch ? asc(searchOrder) : asc(priorityOrder),
|
||||
asc(priorityOrder),
|
||||
desc(canonicalLastActivityAt),
|
||||
desc(issues.updatedAt),
|
||||
desc(issues.id),
|
||||
];
|
||||
}
|
||||
|
||||
async function labelMapForIssues(dbOrTx: any, issueIds: string[]): Promise<Map<string, IssueLabelRow[]>> {
|
||||
const map = new Map<string, IssueLabelRow[]>();
|
||||
if (issueIds.length === 0) return map;
|
||||
|
|
@ -3521,18 +3560,17 @@ export function issueService(db: Db) {
|
|||
ELSE 6
|
||||
END
|
||||
`;
|
||||
const canonicalLastActivityAt = issueCanonicalLastActivityAtExpr(companyId);
|
||||
const baseQuery = db
|
||||
.select(issueListSelect)
|
||||
.from(issues)
|
||||
.where(and(...conditions))
|
||||
.orderBy(
|
||||
hasSearch ? asc(searchOrder) : asc(priorityOrder),
|
||||
asc(priorityOrder),
|
||||
desc(canonicalLastActivityAt),
|
||||
desc(issues.updatedAt),
|
||||
desc(issues.id),
|
||||
);
|
||||
.orderBy(...issueListOrderBy(companyId, {
|
||||
hasSearch,
|
||||
priorityOrder,
|
||||
searchOrder,
|
||||
sortField: filters?.sortField,
|
||||
sortDir: filters?.sortDir,
|
||||
}));
|
||||
const pageQuery = offset > 0
|
||||
? (limit === undefined ? baseQuery.offset(offset) : baseQuery.limit(limit).offset(offset))
|
||||
: (limit === undefined ? baseQuery : baseQuery.limit(limit));
|
||||
|
|
|
|||
|
|
@ -776,19 +776,47 @@ export function pluginLifecycleManager(
|
|||
);
|
||||
}
|
||||
|
||||
log.info(
|
||||
{ pluginId, pluginKey: plugin.pluginKey },
|
||||
"plugin lifecycle: restarting worker",
|
||||
);
|
||||
const supportsRuntimeActivation =
|
||||
typeof pluginLoaderInstance.hasRuntimeServices === "function"
|
||||
&& typeof pluginLoaderInstance.loadSingle === "function"
|
||||
&& typeof pluginLoaderInstance.unloadSingle === "function"
|
||||
&& pluginLoaderInstance.hasRuntimeServices();
|
||||
|
||||
await handle.restart();
|
||||
if (supportsRuntimeActivation) {
|
||||
log.info(
|
||||
{ pluginId, pluginKey: plugin.pluginKey },
|
||||
"plugin lifecycle: reloading plugin (re-reading manifest, re-applying pending migrations, restarting worker)",
|
||||
);
|
||||
|
||||
emitDomain("plugin.worker_stopped", { pluginId, pluginKey: plugin.pluginKey });
|
||||
emitDomain("plugin.worker_started", { pluginId, pluginKey: plugin.pluginKey });
|
||||
// Full deactivate+reactivate cycle (not just `handle.restart()`) so that:
|
||||
// - the manifest is re-read from disk, picking up newly declared
|
||||
// `migrations/*.sql` files and any other manifest changes,
|
||||
// - `applyMigrations` runs idempotently against the up-to-date
|
||||
// migrations directory — pending migrations get applied, already-
|
||||
// applied ones are skipped via the `pluginMigrations` table,
|
||||
// - the worker subprocess is replaced with one loading the freshly
|
||||
// built bundle.
|
||||
//
|
||||
// Bouncing the worker process alone (`handle.restart()`) leaves plugin
|
||||
// schema out of sync with worker code whenever a hot reload adds a new
|
||||
// migration, which makes downstream queries fail against missing tables.
|
||||
await deactivatePluginRuntime(pluginId, plugin.pluginKey);
|
||||
await activateReadyPlugin(pluginId);
|
||||
} else {
|
||||
// No runtime activation services wired in (e.g. state-only test harness)
|
||||
// — fall back to a bare worker subprocess bounce.
|
||||
log.info(
|
||||
{ pluginId, pluginKey: plugin.pluginKey },
|
||||
"plugin lifecycle: restarting worker (runtime services unavailable; skipping migration re-apply)",
|
||||
);
|
||||
await handle.restart();
|
||||
emitDomain("plugin.worker_stopped", { pluginId, pluginKey: plugin.pluginKey });
|
||||
emitDomain("plugin.worker_started", { pluginId, pluginKey: plugin.pluginKey });
|
||||
}
|
||||
|
||||
log.info(
|
||||
{ pluginId, pluginKey: plugin.pluginKey },
|
||||
"plugin lifecycle: worker restarted",
|
||||
"plugin lifecycle: plugin reloaded",
|
||||
);
|
||||
},
|
||||
|
||||
|
|
|
|||
|
|
@ -653,7 +653,9 @@ export function createPluginWorkerHandle(
|
|||
// Handle process errors (e.g. spawn failure)
|
||||
child.on("error", (err) => {
|
||||
log.error({ err: err.message }, "worker process error");
|
||||
emitter.emit("error", { pluginId, error: err });
|
||||
if (emitter.listenerCount("error") > 0) {
|
||||
emitter.emit("error", { pluginId, error: err });
|
||||
}
|
||||
if (status === "starting") {
|
||||
setStatus("crashed");
|
||||
rejectAllPending(
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue