paperclip/cli/src/commands/routines.ts
Dotta ad6effa65c
[codex] Improve runtime and import reliability (#6549)
## Thinking Path

> - Paperclip coordinates autonomous company work through local and
hosted runtime surfaces.
> - Local embedded Postgres and tenant import/export paths are
foundational reliability pieces.
> - A runtime failure in either path can stop agents or imports before
useful work begins.
> - The branch included remaining fixes for embedded native library
bootstrap and async tenant import handling.
> - This pull request groups those runtime/import reliability changes
into one standalone PR.
> - The benefit is a more robust local runtime and safer cloud tenant
import behavior.

## What Changed

- Prepared embedded Postgres native runtime before startup in
CLI/server/test entrypoints.
- Added embedded Postgres native bootstrap coverage.
- Added async tenant import job handling and deferred validation
coverage.
- Kept the runtime/import changes based directly on current
`origin/master` after related upstream PRs had already merged.

## Verification

- `pnpm --filter @paperclipai/plugin-sdk build`
- `NODE_ENV=test pnpm exec vitest run
packages/db/src/embedded-postgres-native.test.ts
server/src/__tests__/company-portability-routes.test.ts`

## Risks

- Medium-low: this touches startup/import paths, but the branch is small
and covered by targeted tests.
- The embedded Postgres change depends on platform-specific
native-library behavior, so CI and follow-up checks should still verify
supported runners.

> For core feature work, check [`ROADMAP.md`](ROADMAP.md) first and
discuss it in `#dev` before opening the PR. Feature PRs that overlap
with planned core work may need to be redirected — check the roadmap
first. See `CONTRIBUTING.md`.

## Model Used

- OpenAI GPT-5 Codex via `codex_local`, tool-enabled coding session;
exact context window not exposed by this runtime.

## Checklist

- [x] I have included a thinking path that traces from project context
to this change
- [x] I have specified the model used (with version and capability
details)
- [x] I have checked ROADMAP.md and confirmed this PR does not duplicate
planned core work
- [x] I have run tests locally and they pass
- [x] I have added or updated tests where applicable
- [x] If this change affects the UI, I have included before/after
screenshots
- [x] I have updated relevant documentation to reflect my changes
- [x] I have considered and documented any risks above
- [x] I will address all Greptile and reviewer comments before
requesting merge
2026-05-22 09:57:22 -05:00

354 lines
11 KiB
TypeScript

import fs from "node:fs";
import net from "node:net";
import path from "node:path";
import { Command } from "commander";
import pc from "picocolors";
import {
applyPendingMigrations,
createDb,
createEmbeddedPostgresLogBuffer,
ensurePostgresDatabase,
formatEmbeddedPostgresError,
prepareEmbeddedPostgresNativeRuntime,
routines,
} from "@paperclipai/db";
import { eq, inArray } from "drizzle-orm";
import { loadPaperclipEnvFile } from "../config/env.js";
import { readConfig, resolveConfigPath } from "../config/store.js";
type RoutinesDisableAllOptions = {
config?: string;
dataDir?: string;
companyId?: string;
json?: boolean;
};
type DisableAllRoutinesResult = {
companyId: string;
totalRoutines: number;
pausedCount: number;
alreadyPausedCount: number;
archivedCount: number;
};
type EmbeddedPostgresInstance = {
initialise(): Promise<void>;
start(): Promise<void>;
stop(): Promise<void>;
};
type EmbeddedPostgresCtor = new (opts: {
databaseDir: string;
user: string;
password: string;
port: number;
persistent: boolean;
initdbFlags?: string[];
onLog?: (message: unknown) => void;
onError?: (message: unknown) => void;
}) => EmbeddedPostgresInstance;
type EmbeddedPostgresHandle = {
port: number;
startedByThisProcess: boolean;
stop: () => Promise<void>;
};
type ClosableDb = ReturnType<typeof createDb> & {
$client?: {
end?: (options?: { timeout?: number }) => Promise<void>;
};
};
function nonEmpty(value: string | null | undefined): string | null {
return typeof value === "string" && value.trim().length > 0 ? value.trim() : null;
}
async function isPortAvailable(port: number): Promise<boolean> {
return await new Promise<boolean>((resolve) => {
const server = net.createServer();
server.unref();
server.once("error", () => resolve(false));
server.listen(port, "127.0.0.1", () => {
server.close(() => resolve(true));
});
});
}
async function findAvailablePort(preferredPort: number): Promise<number> {
let port = Math.max(1, Math.trunc(preferredPort));
while (!(await isPortAvailable(port))) {
port += 1;
}
return port;
}
function readPidFilePort(postmasterPidFile: string): number | null {
if (!fs.existsSync(postmasterPidFile)) return null;
try {
const lines = fs.readFileSync(postmasterPidFile, "utf8").split("\n");
const port = Number(lines[3]?.trim());
return Number.isInteger(port) && port > 0 ? port : null;
} catch {
return null;
}
}
function readRunningPostmasterPid(postmasterPidFile: string): number | null {
if (!fs.existsSync(postmasterPidFile)) return null;
try {
const pid = Number(fs.readFileSync(postmasterPidFile, "utf8").split("\n")[0]?.trim());
if (!Number.isInteger(pid) || pid <= 0) return null;
process.kill(pid, 0);
return pid;
} catch {
return null;
}
}
async function ensureEmbeddedPostgres(dataDir: string, preferredPort: number): Promise<EmbeddedPostgresHandle> {
const moduleName = "embedded-postgres";
let EmbeddedPostgres: EmbeddedPostgresCtor;
try {
const mod = await import(moduleName);
EmbeddedPostgres = mod.default as EmbeddedPostgresCtor;
} catch {
throw new Error(
"Embedded PostgreSQL support requires dependency `embedded-postgres`. Reinstall dependencies and try again.",
);
}
await prepareEmbeddedPostgresNativeRuntime();
const postmasterPidFile = path.resolve(dataDir, "postmaster.pid");
const runningPid = readRunningPostmasterPid(postmasterPidFile);
if (runningPid) {
return {
port: readPidFilePort(postmasterPidFile) ?? preferredPort,
startedByThisProcess: false,
stop: async () => {},
};
}
const port = await findAvailablePort(preferredPort);
const logBuffer = createEmbeddedPostgresLogBuffer();
const instance = new EmbeddedPostgres({
databaseDir: dataDir,
user: "paperclip",
password: "paperclip",
port,
persistent: true,
initdbFlags: ["--encoding=UTF8", "--locale=C", "--lc-messages=C"],
onLog: logBuffer.append,
onError: logBuffer.append,
});
if (!fs.existsSync(path.resolve(dataDir, "PG_VERSION"))) {
try {
await instance.initialise();
} catch (error) {
throw formatEmbeddedPostgresError(error, {
fallbackMessage: `Failed to initialize embedded PostgreSQL cluster in ${dataDir} on port ${port}`,
recentLogs: logBuffer.getRecentLogs(),
});
}
}
if (fs.existsSync(postmasterPidFile)) {
fs.rmSync(postmasterPidFile, { force: true });
}
try {
await instance.start();
} catch (error) {
throw formatEmbeddedPostgresError(error, {
fallbackMessage: `Failed to start embedded PostgreSQL on port ${port}`,
recentLogs: logBuffer.getRecentLogs(),
});
}
return {
port,
startedByThisProcess: true,
stop: async () => {
await instance.stop();
},
};
}
async function closeDb(db: ClosableDb): Promise<void> {
await db.$client?.end?.({ timeout: 5 }).catch(() => undefined);
}
async function openConfiguredDb(configPath: string): Promise<{
db: ClosableDb;
stop: () => Promise<void>;
}> {
const config = readConfig(configPath);
if (!config) {
throw new Error(`Config not found at ${configPath}.`);
}
let embeddedHandle: EmbeddedPostgresHandle | null = null;
try {
if (config.database.mode === "embedded-postgres") {
embeddedHandle = await ensureEmbeddedPostgres(
config.database.embeddedPostgresDataDir,
config.database.embeddedPostgresPort,
);
const adminConnectionString = `postgres://paperclip:paperclip@127.0.0.1:${embeddedHandle.port}/postgres`;
await ensurePostgresDatabase(adminConnectionString, "paperclip");
const connectionString = `postgres://paperclip:paperclip@127.0.0.1:${embeddedHandle.port}/paperclip`;
await applyPendingMigrations(connectionString);
const db = createDb(connectionString) as ClosableDb;
return {
db,
stop: async () => {
await closeDb(db);
if (embeddedHandle?.startedByThisProcess) {
await embeddedHandle.stop().catch(() => undefined);
}
},
};
}
const connectionString = nonEmpty(config.database.connectionString);
if (!connectionString) {
throw new Error(`Config at ${configPath} does not define a database connection string.`);
}
await applyPendingMigrations(connectionString);
const db = createDb(connectionString) as ClosableDb;
return {
db,
stop: async () => {
await closeDb(db);
},
};
} catch (error) {
if (embeddedHandle?.startedByThisProcess) {
await embeddedHandle.stop().catch(() => undefined);
}
throw error;
}
}
export async function disableAllRoutinesInConfig(
options: Pick<RoutinesDisableAllOptions, "config" | "companyId">,
): Promise<DisableAllRoutinesResult> {
const configPath = resolveConfigPath(options.config);
loadPaperclipEnvFile(configPath);
const companyId =
nonEmpty(options.companyId)
?? nonEmpty(process.env.PAPERCLIP_COMPANY_ID)
?? null;
if (!companyId) {
throw new Error("Company ID is required. Pass --company-id or set PAPERCLIP_COMPANY_ID.");
}
const config = readConfig(configPath);
if (!config) {
throw new Error(`Config not found at ${configPath}.`);
}
let embeddedHandle: EmbeddedPostgresHandle | null = null;
let db: ClosableDb | null = null;
try {
if (config.database.mode === "embedded-postgres") {
embeddedHandle = await ensureEmbeddedPostgres(
config.database.embeddedPostgresDataDir,
config.database.embeddedPostgresPort,
);
const adminConnectionString = `postgres://paperclip:paperclip@127.0.0.1:${embeddedHandle.port}/postgres`;
await ensurePostgresDatabase(adminConnectionString, "paperclip");
const connectionString = `postgres://paperclip:paperclip@127.0.0.1:${embeddedHandle.port}/paperclip`;
await applyPendingMigrations(connectionString);
db = createDb(connectionString) as ClosableDb;
} else {
const connectionString = nonEmpty(config.database.connectionString);
if (!connectionString) {
throw new Error(`Config at ${configPath} does not define a database connection string.`);
}
await applyPendingMigrations(connectionString);
db = createDb(connectionString) as ClosableDb;
}
const existing = await db
.select({
id: routines.id,
status: routines.status,
})
.from(routines)
.where(eq(routines.companyId, companyId));
const alreadyPausedCount = existing.filter((routine) => routine.status === "paused").length;
const archivedCount = existing.filter((routine) => routine.status === "archived").length;
const idsToPause = existing
.filter((routine) => routine.status !== "paused" && routine.status !== "archived")
.map((routine) => routine.id);
if (idsToPause.length > 0) {
await db
.update(routines)
.set({
status: "paused",
updatedAt: new Date(),
})
.where(inArray(routines.id, idsToPause));
}
return {
companyId,
totalRoutines: existing.length,
pausedCount: idsToPause.length,
alreadyPausedCount,
archivedCount,
};
} finally {
if (db) {
await closeDb(db);
}
if (embeddedHandle?.startedByThisProcess) {
await embeddedHandle.stop().catch(() => undefined);
}
}
}
export async function disableAllRoutinesCommand(options: RoutinesDisableAllOptions): Promise<void> {
const result = await disableAllRoutinesInConfig(options);
if (options.json) {
console.log(JSON.stringify(result, null, 2));
return;
}
if (result.totalRoutines === 0) {
console.log(pc.dim(`No routines found for company ${result.companyId}.`));
return;
}
console.log(
`Paused ${result.pausedCount} routine(s) for company ${result.companyId} ` +
`(${result.alreadyPausedCount} already paused, ${result.archivedCount} archived).`,
);
}
export function registerRoutineCommands(program: Command): void {
const routinesCommand = program.command("routines").description("Local routine maintenance commands");
routinesCommand
.command("disable-all")
.description("Pause all non-archived routines in the configured local instance for one company")
.option("-c, --config <path>", "Path to config file")
.option("-d, --data-dir <path>", "Paperclip data directory root (isolates state from ~/.paperclip)")
.option("-C, --company-id <id>", "Company ID")
.option("--json", "Output raw JSON")
.action(async (opts: RoutinesDisableAllOptions) => {
try {
await disableAllRoutinesCommand(opts);
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
console.error(pc.red(message));
process.exit(1);
}
});
}