mirror of
https://github.com/alkimake/paperclip.git
synced 2026-06-14 01:50:39 +09:00
[codex] Improve runtime and import reliability (#6549)
## Thinking Path > - Paperclip coordinates autonomous company work through local and hosted runtime surfaces. > - Local embedded Postgres and tenant import/export paths are foundational reliability pieces. > - A runtime failure in either path can stop agents or imports before useful work begins. > - The branch included remaining fixes for embedded native library bootstrap and async tenant import handling. > - This pull request groups those runtime/import reliability changes into one standalone PR. > - The benefit is a more robust local runtime and safer cloud tenant import behavior. ## What Changed - Prepared embedded Postgres native runtime before startup in CLI/server/test entrypoints. - Added embedded Postgres native bootstrap coverage. - Added async tenant import job handling and deferred validation coverage. - Kept the runtime/import changes based directly on current `origin/master` after related upstream PRs had already merged. ## Verification - `pnpm --filter @paperclipai/plugin-sdk build` - `NODE_ENV=test pnpm exec vitest run packages/db/src/embedded-postgres-native.test.ts server/src/__tests__/company-portability-routes.test.ts` ## Risks - Medium-low: this touches startup/import paths, but the branch is small and covered by targeted tests. - The embedded Postgres change depends on platform-specific native-library behavior, so CI and follow-up checks should still verify supported runners. > For core feature work, check [`ROADMAP.md`](ROADMAP.md) first and discuss it in `#dev` before opening the PR. Feature PRs that overlap with planned core work may need to be redirected — check the roadmap first. See `CONTRIBUTING.md`. ## Model Used - OpenAI GPT-5 Codex via `codex_local`, tool-enabled coding session; exact context window not exposed by this runtime. ## Checklist - [x] I have included a thinking path that traces from project context to this change - [x] I have specified the model used (with version and capability details) - [x] I have checked ROADMAP.md and confirmed this PR does not duplicate planned core work - [x] I have run tests locally and they pass - [x] I have added or updated tests where applicable - [x] If this change affects the UI, I have included before/after screenshots - [x] I have updated relevant documentation to reflect my changes - [x] I have considered and documented any risks above - [x] I will address all Greptile and reviewer comments before requesting merge
This commit is contained in:
parent
e43b392a79
commit
ad6effa65c
10 changed files with 511 additions and 19 deletions
|
|
@ -9,6 +9,7 @@ import {
|
||||||
createEmbeddedPostgresLogBuffer,
|
createEmbeddedPostgresLogBuffer,
|
||||||
ensurePostgresDatabase,
|
ensurePostgresDatabase,
|
||||||
formatEmbeddedPostgresError,
|
formatEmbeddedPostgresError,
|
||||||
|
prepareEmbeddedPostgresNativeRuntime,
|
||||||
routines,
|
routines,
|
||||||
} from "@paperclipai/db";
|
} from "@paperclipai/db";
|
||||||
import { eq, inArray } from "drizzle-orm";
|
import { eq, inArray } from "drizzle-orm";
|
||||||
|
|
@ -116,6 +117,7 @@ async function ensureEmbeddedPostgres(dataDir: string, preferredPort: number): P
|
||||||
"Embedded PostgreSQL support requires dependency `embedded-postgres`. Reinstall dependencies and try again.",
|
"Embedded PostgreSQL support requires dependency `embedded-postgres`. Reinstall dependencies and try again.",
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
await prepareEmbeddedPostgresNativeRuntime();
|
||||||
|
|
||||||
const postmasterPidFile = path.resolve(dataDir, "postmaster.pid");
|
const postmasterPidFile = path.resolve(dataDir, "postmaster.pid");
|
||||||
const runningPid = readRunningPostmasterPid(postmasterPidFile);
|
const runningPid = readRunningPostmasterPid(postmasterPidFile);
|
||||||
|
|
|
||||||
|
|
@ -45,6 +45,7 @@ import {
|
||||||
runDatabaseRestore,
|
runDatabaseRestore,
|
||||||
createEmbeddedPostgresLogBuffer,
|
createEmbeddedPostgresLogBuffer,
|
||||||
formatEmbeddedPostgresError,
|
formatEmbeddedPostgresError,
|
||||||
|
prepareEmbeddedPostgresNativeRuntime,
|
||||||
} from "@paperclipai/db";
|
} from "@paperclipai/db";
|
||||||
import type { Command } from "commander";
|
import type { Command } from "commander";
|
||||||
import { ensureAgentJwtSecret, loadPaperclipEnvFile, mergePaperclipEnvEntries, readPaperclipEnvEntries, resolvePaperclipEnvFile } from "../config/env.js";
|
import { ensureAgentJwtSecret, loadPaperclipEnvFile, mergePaperclipEnvEntries, readPaperclipEnvEntries, resolvePaperclipEnvFile } from "../config/env.js";
|
||||||
|
|
@ -1059,6 +1060,7 @@ async function ensureEmbeddedPostgres(dataDir: string, preferredPort: number): P
|
||||||
"Embedded PostgreSQL support requires dependency `embedded-postgres`. Reinstall dependencies and try again.",
|
"Embedded PostgreSQL support requires dependency `embedded-postgres`. Reinstall dependencies and try again.",
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
await prepareEmbeddedPostgresNativeRuntime();
|
||||||
|
|
||||||
const postmasterPidFile = path.resolve(dataDir, "postmaster.pid");
|
const postmasterPidFile = path.resolve(dataDir, "postmaster.pid");
|
||||||
const runningPid = readRunningPostmasterPid(postmasterPidFile);
|
const runningPid = readRunningPostmasterPid(postmasterPidFile);
|
||||||
|
|
|
||||||
43
packages/db/src/embedded-postgres-native.test.ts
Normal file
43
packages/db/src/embedded-postgres-native.test.ts
Normal file
|
|
@ -0,0 +1,43 @@
|
||||||
|
import fs from "node:fs";
|
||||||
|
import os from "node:os";
|
||||||
|
import path from "node:path";
|
||||||
|
import { afterEach, describe, expect, it } from "vitest";
|
||||||
|
import { ensureLinuxSharedLibraryAliases } from "./embedded-postgres-native.js";
|
||||||
|
|
||||||
|
describe("embedded Postgres native runtime", () => {
|
||||||
|
const tempDirs: string[] = [];
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
for (const tempDir of tempDirs.splice(0)) {
|
||||||
|
fs.rmSync(tempDir, { recursive: true, force: true });
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it.runIf(process.platform !== "win32")("creates soname aliases for bundled patch-level shared libraries", async () => {
|
||||||
|
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "paperclip-embedded-pg-libs-"));
|
||||||
|
tempDirs.push(tempDir);
|
||||||
|
fs.writeFileSync(path.join(tempDir, "libicuuc.so.60.2"), "");
|
||||||
|
fs.writeFileSync(path.join(tempDir, "libicui18n.so.60.2"), "");
|
||||||
|
fs.writeFileSync(path.join(tempDir, "README.md"), "");
|
||||||
|
|
||||||
|
const created = await ensureLinuxSharedLibraryAliases(tempDir);
|
||||||
|
|
||||||
|
expect(created.map((file) => path.basename(file)).sort()).toEqual([
|
||||||
|
"libicui18n.so.60",
|
||||||
|
"libicuuc.so.60",
|
||||||
|
]);
|
||||||
|
expect(fs.readlinkSync(path.join(tempDir, "libicuuc.so.60"))).toBe("libicuuc.so.60.2");
|
||||||
|
});
|
||||||
|
|
||||||
|
it.runIf(process.platform !== "win32")("is idempotent when aliases already exist", async () => {
|
||||||
|
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "paperclip-embedded-pg-libs-"));
|
||||||
|
tempDirs.push(tempDir);
|
||||||
|
fs.writeFileSync(path.join(tempDir, "libicuuc.so.60.2"), "");
|
||||||
|
|
||||||
|
await ensureLinuxSharedLibraryAliases(tempDir);
|
||||||
|
const second = await ensureLinuxSharedLibraryAliases(tempDir);
|
||||||
|
|
||||||
|
expect(second).toEqual([]);
|
||||||
|
expect(fs.readlinkSync(path.join(tempDir, "libicuuc.so.60"))).toBe("libicuuc.so.60.2");
|
||||||
|
});
|
||||||
|
});
|
||||||
85
packages/db/src/embedded-postgres-native.ts
Normal file
85
packages/db/src/embedded-postgres-native.ts
Normal file
|
|
@ -0,0 +1,85 @@
|
||||||
|
import { promises as fs } from "node:fs";
|
||||||
|
import { createRequire } from "node:module";
|
||||||
|
import path from "node:path";
|
||||||
|
|
||||||
|
const require = createRequire(import.meta.url);
|
||||||
|
|
||||||
|
function resolveNativePackageName(): string | null {
|
||||||
|
if (process.platform !== "linux") return null;
|
||||||
|
|
||||||
|
switch (process.arch) {
|
||||||
|
case "arm64":
|
||||||
|
return "linux-arm64";
|
||||||
|
case "arm":
|
||||||
|
return "linux-arm";
|
||||||
|
case "ia32":
|
||||||
|
return "linux-ia32";
|
||||||
|
case "ppc64":
|
||||||
|
return "linux-ppc64";
|
||||||
|
case "x64":
|
||||||
|
return "linux-x64";
|
||||||
|
default:
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async function pathExists(value: string): Promise<boolean> {
|
||||||
|
try {
|
||||||
|
await fs.stat(value);
|
||||||
|
return true;
|
||||||
|
} catch {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function resolveEmbeddedPostgresPackageRoot(): string | null {
|
||||||
|
try {
|
||||||
|
const entry = require.resolve("embedded-postgres");
|
||||||
|
return path.dirname(path.dirname(entry));
|
||||||
|
} catch {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function prependPathEnv(name: string, value: string): void {
|
||||||
|
const current = process.env[name] ?? "";
|
||||||
|
const parts = current.split(path.delimiter).filter(Boolean);
|
||||||
|
if (parts.includes(value)) return;
|
||||||
|
process.env[name] = [value, ...parts].join(path.delimiter);
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function ensureLinuxSharedLibraryAliases(libDir: string): Promise<string[]> {
|
||||||
|
const entries = await fs.readdir(libDir, { withFileTypes: true });
|
||||||
|
const created: string[] = [];
|
||||||
|
|
||||||
|
for (const entry of entries) {
|
||||||
|
if (!entry.isFile()) continue;
|
||||||
|
const match = entry.name.match(/^(lib.+\.so\.\d+)\.\d+(?:\.\d+)?$/);
|
||||||
|
if (!match) continue;
|
||||||
|
|
||||||
|
const aliasName = match[1];
|
||||||
|
const aliasPath = path.join(libDir, aliasName);
|
||||||
|
try {
|
||||||
|
await fs.symlink(entry.name, aliasPath);
|
||||||
|
created.push(aliasPath);
|
||||||
|
} catch (error) {
|
||||||
|
if ((error as NodeJS.ErrnoException).code === "EEXIST") continue;
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return created;
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function prepareEmbeddedPostgresNativeRuntime(): Promise<void> {
|
||||||
|
const nativePackageName = resolveNativePackageName();
|
||||||
|
const packageRoot = resolveEmbeddedPostgresPackageRoot();
|
||||||
|
if (!nativePackageName || !packageRoot) return;
|
||||||
|
|
||||||
|
const nativeRoot = path.resolve(packageRoot, "..", "@embedded-postgres", nativePackageName);
|
||||||
|
const libDir = path.join(nativeRoot, "native", "lib");
|
||||||
|
if (!(await pathExists(libDir))) return;
|
||||||
|
|
||||||
|
prependPathEnv("LD_LIBRARY_PATH", libDir);
|
||||||
|
await ensureLinuxSharedLibraryAliases(libDir);
|
||||||
|
}
|
||||||
|
|
@ -30,6 +30,10 @@ export {
|
||||||
createEmbeddedPostgresLogBuffer,
|
createEmbeddedPostgresLogBuffer,
|
||||||
formatEmbeddedPostgresError,
|
formatEmbeddedPostgresError,
|
||||||
} from "./embedded-postgres-error.js";
|
} from "./embedded-postgres-error.js";
|
||||||
|
export {
|
||||||
|
ensureLinuxSharedLibraryAliases,
|
||||||
|
prepareEmbeddedPostgresNativeRuntime,
|
||||||
|
} from "./embedded-postgres-native.js";
|
||||||
export { issueRelations } from "./schema/issue_relations.js";
|
export { issueRelations } from "./schema/issue_relations.js";
|
||||||
export { issueReferenceMentions } from "./schema/issue_reference_mentions.js";
|
export { issueReferenceMentions } from "./schema/issue_reference_mentions.js";
|
||||||
export * from "./schema/index.js";
|
export * from "./schema/index.js";
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,7 @@ import { createServer } from "node:net";
|
||||||
import path from "node:path";
|
import path from "node:path";
|
||||||
import { ensurePostgresDatabase, getPostgresDataDirectory } from "./client.js";
|
import { ensurePostgresDatabase, getPostgresDataDirectory } from "./client.js";
|
||||||
import { createEmbeddedPostgresLogBuffer, formatEmbeddedPostgresError } from "./embedded-postgres-error.js";
|
import { createEmbeddedPostgresLogBuffer, formatEmbeddedPostgresError } from "./embedded-postgres-error.js";
|
||||||
|
import { prepareEmbeddedPostgresNativeRuntime } from "./embedded-postgres-native.js";
|
||||||
import { resolveDatabaseTarget } from "./runtime-config.js";
|
import { resolveDatabaseTarget } from "./runtime-config.js";
|
||||||
|
|
||||||
type EmbeddedPostgresInstance = {
|
type EmbeddedPostgresInstance = {
|
||||||
|
|
@ -92,6 +93,7 @@ async function ensureEmbeddedPostgresConnection(
|
||||||
preferredPort: number,
|
preferredPort: number,
|
||||||
): Promise<MigrationConnection> {
|
): Promise<MigrationConnection> {
|
||||||
const EmbeddedPostgres = await loadEmbeddedPostgresCtor();
|
const EmbeddedPostgres = await loadEmbeddedPostgresCtor();
|
||||||
|
await prepareEmbeddedPostgresNativeRuntime();
|
||||||
const selectedPort = await findAvailablePort(preferredPort);
|
const selectedPort = await findAvailablePort(preferredPort);
|
||||||
const postmasterPidFile = path.resolve(dataDir, "postmaster.pid");
|
const postmasterPidFile = path.resolve(dataDir, "postmaster.pid");
|
||||||
const pgVersionFile = path.resolve(dataDir, "PG_VERSION");
|
const pgVersionFile = path.resolve(dataDir, "PG_VERSION");
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,7 @@ import net from "node:net";
|
||||||
import os from "node:os";
|
import os from "node:os";
|
||||||
import path from "node:path";
|
import path from "node:path";
|
||||||
import { applyPendingMigrations, ensurePostgresDatabase } from "./client.js";
|
import { applyPendingMigrations, ensurePostgresDatabase } from "./client.js";
|
||||||
|
import { prepareEmbeddedPostgresNativeRuntime } from "./embedded-postgres-native.js";
|
||||||
|
|
||||||
type EmbeddedPostgresInstance = {
|
type EmbeddedPostgresInstance = {
|
||||||
initialise(): Promise<void>;
|
initialise(): Promise<void>;
|
||||||
|
|
@ -48,6 +49,7 @@ function getReservedTestPorts(): Set<number> {
|
||||||
|
|
||||||
async function getEmbeddedPostgresCtor(): Promise<EmbeddedPostgresCtor> {
|
async function getEmbeddedPostgresCtor(): Promise<EmbeddedPostgresCtor> {
|
||||||
const mod = await import("embedded-postgres");
|
const mod = await import("embedded-postgres");
|
||||||
|
await prepareEmbeddedPostgresNativeRuntime();
|
||||||
return mod.default as EmbeddedPostgresCtor;
|
return mod.default as EmbeddedPostgresCtor;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -139,6 +139,58 @@ function createExportResult() {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const importRequest = {
|
||||||
|
source: { type: "inline", files: { "COMPANY.md": "---\nname: Test\n---\n" } },
|
||||||
|
include: { company: true, agents: true, projects: false, issues: false },
|
||||||
|
target: { mode: "existing_company", companyId },
|
||||||
|
collisionStrategy: "rename",
|
||||||
|
};
|
||||||
|
|
||||||
|
const cloudHeaders = {
|
||||||
|
"x-paperclip-cloud-stack-id": "stack-alpha",
|
||||||
|
"x-paperclip-cloud-paperclip-company-id": companyId,
|
||||||
|
};
|
||||||
|
|
||||||
|
function cloudTenantActor() {
|
||||||
|
return {
|
||||||
|
type: "board",
|
||||||
|
userId: "cloud-user-1",
|
||||||
|
userName: "Cloud User",
|
||||||
|
userEmail: "cloud-user@example.com",
|
||||||
|
companyIds: [companyId],
|
||||||
|
memberships: [{ companyId, membershipRole: "owner", status: "active" }],
|
||||||
|
isInstanceAdmin: true,
|
||||||
|
source: "cloud_tenant",
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
function createImportResult(action = "updated") {
|
||||||
|
return {
|
||||||
|
company: { id: companyId, action },
|
||||||
|
agents: [{ id: "agent-1" }],
|
||||||
|
warnings: [],
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
async function waitForImportJobStatus(app: express.Express, statusUrl: string, status: string) {
|
||||||
|
for (let attempt = 0; attempt < 20; attempt += 1) {
|
||||||
|
const res = await request(app).get(statusUrl).set(cloudHeaders);
|
||||||
|
if (res.body.job?.status === status) {
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||||
|
}
|
||||||
|
throw new Error(`Timed out waiting for import job to reach ${status}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
async function waitForCondition(condition: () => boolean, label: string) {
|
||||||
|
for (let attempt = 0; attempt < 20; attempt += 1) {
|
||||||
|
if (condition()) return;
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||||
|
}
|
||||||
|
throw new Error(`Timed out waiting for ${label}`);
|
||||||
|
}
|
||||||
|
|
||||||
describe.sequential("company portability routes", () => {
|
describe.sequential("company portability routes", () => {
|
||||||
beforeEach(() => {
|
beforeEach(() => {
|
||||||
vi.clearAllMocks();
|
vi.clearAllMocks();
|
||||||
|
|
@ -426,4 +478,116 @@ describe.sequential("company portability routes", () => {
|
||||||
expect(res.body.error).toContain("Instance admin");
|
expect(res.body.error).toContain("Instance admin");
|
||||||
expect(mockCompanyPortabilityService.importBundle).not.toHaveBeenCalled();
|
expect(mockCompanyPortabilityService.importBundle).not.toHaveBeenCalled();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it.sequential("accepts trusted Cloud async import jobs and reports success by job id", async () => {
|
||||||
|
let resolveImport: (value: ReturnType<typeof createImportResult>) => void = () => undefined;
|
||||||
|
const pendingImport = new Promise<ReturnType<typeof createImportResult>>((resolve) => {
|
||||||
|
resolveImport = resolve;
|
||||||
|
});
|
||||||
|
mockCompanyPortabilityService.importBundle.mockReturnValueOnce(pendingImport);
|
||||||
|
const app = await createApp(cloudTenantActor());
|
||||||
|
|
||||||
|
const accepted = await request(app)
|
||||||
|
.post("/api/companies/import")
|
||||||
|
.set("x-paperclip-cloud-async-import", "1")
|
||||||
|
.set(cloudHeaders)
|
||||||
|
.send(importRequest);
|
||||||
|
|
||||||
|
expect(accepted.status).toBe(202);
|
||||||
|
expect(accepted.body.job.status).toBe("running");
|
||||||
|
expect(accepted.body.statusUrl).toMatch(/^\/api\/companies\/import\/jobs\/tenant-import-/);
|
||||||
|
expect(accepted.body.retryAfterMs).toBe(1000);
|
||||||
|
await waitForCondition(() => mockCompanyPortabilityService.importBundle.mock.calls.length === 1, "import job start");
|
||||||
|
expect(mockCompanyPortabilityService.importBundle).toHaveBeenCalledWith(importRequest, "cloud-user-1");
|
||||||
|
expect(mockLogActivity).not.toHaveBeenCalled();
|
||||||
|
|
||||||
|
resolveImport(createImportResult("updated"));
|
||||||
|
const succeeded = await waitForImportJobStatus(app, accepted.body.statusUrl, "succeeded");
|
||||||
|
|
||||||
|
expect(succeeded.status).toBe(200);
|
||||||
|
expect(succeeded.body.job.status).toBe("succeeded");
|
||||||
|
expect(succeeded.body.job.result.companyId).toBe(companyId);
|
||||||
|
expect(succeeded.body.retryAfterMs).toBeUndefined();
|
||||||
|
expect(mockLogActivity).toHaveBeenCalledWith(expect.anything(), expect.objectContaining({
|
||||||
|
action: "company.imported",
|
||||||
|
companyId,
|
||||||
|
details: expect.objectContaining({
|
||||||
|
agentCount: 1,
|
||||||
|
warningCount: 0,
|
||||||
|
companyAction: "updated",
|
||||||
|
}),
|
||||||
|
}));
|
||||||
|
|
||||||
|
const nowSpy = vi.spyOn(Date, "now").mockReturnValue(Date.parse(succeeded.body.job.completedAt) + (5 * 60 * 1000) + 1);
|
||||||
|
try {
|
||||||
|
const expired = await request(app).get(accepted.body.statusUrl).set(cloudHeaders);
|
||||||
|
expect(expired.status).toBe(404);
|
||||||
|
expect(expired.body.error).toBe("Import job not found");
|
||||||
|
} finally {
|
||||||
|
nowSpy.mockRestore();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it.sequential("reports trusted Cloud async import job failures with the tenant error message", async () => {
|
||||||
|
mockCompanyPortabilityService.importBundle.mockRejectedValueOnce(new Error("tenant import exploded"));
|
||||||
|
const app = await createApp(cloudTenantActor());
|
||||||
|
|
||||||
|
const accepted = await request(app)
|
||||||
|
.post("/api/companies/import")
|
||||||
|
.set("x-paperclip-cloud-async-import", "1")
|
||||||
|
.set(cloudHeaders)
|
||||||
|
.send(importRequest);
|
||||||
|
|
||||||
|
expect(accepted.status).toBe(202);
|
||||||
|
const failed = await waitForImportJobStatus(app, accepted.body.statusUrl, "failed");
|
||||||
|
|
||||||
|
expect(failed.status).toBe(200);
|
||||||
|
expect(failed.body.job.status).toBe("failed");
|
||||||
|
expect(failed.body.job.error.message).toBe("tenant import exploded");
|
||||||
|
expect(failed.body.retryAfterMs).toBeUndefined();
|
||||||
|
expect(failed.body.message).toBe("tenant import exploded");
|
||||||
|
expect(mockLogActivity).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
|
||||||
|
it.sequential("accepts trusted Cloud async import jobs before validating the full import payload", async () => {
|
||||||
|
const app = await createApp(cloudTenantActor());
|
||||||
|
|
||||||
|
const accepted = await request(app)
|
||||||
|
.post("/api/companies/import")
|
||||||
|
.set("x-paperclip-cloud-async-import", "1")
|
||||||
|
.set(cloudHeaders)
|
||||||
|
.send({ target: { mode: "existing_company", companyId } });
|
||||||
|
|
||||||
|
expect(accepted.status).toBe(202);
|
||||||
|
expect(accepted.body.job.status).toBe("running");
|
||||||
|
expect(mockCompanyPortabilityService.importBundle).not.toHaveBeenCalled();
|
||||||
|
|
||||||
|
const failed = await waitForImportJobStatus(app, accepted.body.statusUrl, "failed");
|
||||||
|
|
||||||
|
expect(failed.status).toBe(200);
|
||||||
|
expect(failed.body.job.status).toBe("failed");
|
||||||
|
expect(failed.body.job.error.message).toEqual(expect.any(String));
|
||||||
|
expect(mockCompanyPortabilityService.importBundle).not.toHaveBeenCalled();
|
||||||
|
expect(mockLogActivity).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
|
||||||
|
it.sequential("keeps global import apply synchronous when Cloud async opt-in is absent", async () => {
|
||||||
|
mockCompanyPortabilityService.importBundle.mockResolvedValueOnce(createImportResult("created"));
|
||||||
|
const app = await createApp(cloudTenantActor());
|
||||||
|
|
||||||
|
const res = await request(app)
|
||||||
|
.post("/api/companies/import")
|
||||||
|
.set(cloudHeaders)
|
||||||
|
.send(importRequest);
|
||||||
|
|
||||||
|
expect(res.status).toBe(200);
|
||||||
|
expect(res.body.company.id).toBe(companyId);
|
||||||
|
expect(res.body.company.action).toBe("created");
|
||||||
|
expect(res.body.job).toBeUndefined();
|
||||||
|
expect(mockCompanyPortabilityService.importBundle).toHaveBeenCalledWith(importRequest, "cloud-user-1");
|
||||||
|
expect(mockLogActivity).toHaveBeenCalledWith(expect.anything(), expect.objectContaining({
|
||||||
|
action: "company.imported",
|
||||||
|
companyId,
|
||||||
|
}));
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
|
||||||
|
|
@ -15,6 +15,7 @@ import {
|
||||||
inspectMigrations,
|
inspectMigrations,
|
||||||
applyPendingMigrations,
|
applyPendingMigrations,
|
||||||
createEmbeddedPostgresLogBuffer,
|
createEmbeddedPostgresLogBuffer,
|
||||||
|
prepareEmbeddedPostgresNativeRuntime,
|
||||||
reconcilePendingMigrationHistory,
|
reconcilePendingMigrationHistory,
|
||||||
formatDatabaseBackupResult,
|
formatDatabaseBackupResult,
|
||||||
runDatabaseBackup,
|
runDatabaseBackup,
|
||||||
|
|
@ -318,6 +319,7 @@ export async function startServer(): Promise<StartedServer> {
|
||||||
"Embedded PostgreSQL mode requires dependency `embedded-postgres`. Reinstall dependencies (without omitting required packages), or set DATABASE_URL for external Postgres.",
|
"Embedded PostgreSQL mode requires dependency `embedded-postgres`. Reinstall dependencies (without omitting required packages), or set DATABASE_URL for external Postgres.",
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
await prepareEmbeddedPostgresNativeRuntime();
|
||||||
|
|
||||||
const dataDir = resolve(config.embeddedPostgresDataDir);
|
const dataDir = resolve(config.embeddedPostgresDataDir);
|
||||||
const configuredPort = config.embeddedPostgresPort;
|
const configuredPort = config.embeddedPostgresPort;
|
||||||
|
|
|
||||||
|
|
@ -1,3 +1,4 @@
|
||||||
|
import { randomUUID } from "node:crypto";
|
||||||
import { Router, type Request } from "express";
|
import { Router, type Request } from "express";
|
||||||
import type { Db } from "@paperclipai/db";
|
import type { Db } from "@paperclipai/db";
|
||||||
import {
|
import {
|
||||||
|
|
@ -35,6 +36,8 @@ export function companyRoutes(db: Db, storage?: StorageService) {
|
||||||
const access = accessService(db);
|
const access = accessService(db);
|
||||||
const budgets = budgetService(db);
|
const budgets = budgetService(db);
|
||||||
const feedback = feedbackService(db);
|
const feedback = feedbackService(db);
|
||||||
|
const importJobs = new Map<string, ImportJobRecord>();
|
||||||
|
const importJobTerminalRetentionMs = 5 * 60 * 1000;
|
||||||
|
|
||||||
function parseBooleanQuery(value: unknown) {
|
function parseBooleanQuery(value: unknown) {
|
||||||
return value === true || value === "true" || value === "1";
|
return value === true || value === "true" || value === "1";
|
||||||
|
|
@ -177,27 +180,47 @@ export function companyRoutes(db: Db, storage?: StorageService) {
|
||||||
res.json(preview);
|
res.json(preview);
|
||||||
});
|
});
|
||||||
|
|
||||||
router.post(COMPANY_IMPORT_ROUTE_PATH, validate(companyPortabilityImportSchema), async (req, res) => {
|
router.get("/import/jobs/:jobId", async (req, res) => {
|
||||||
|
assertCloudTenantCaller(req);
|
||||||
|
cleanupTerminalImportJobs(importJobs, importJobTerminalRetentionMs);
|
||||||
|
const job = importJobs.get(req.params.jobId as string);
|
||||||
|
if (!job || job.cloudTenantKey !== cloudTenantRequestKey(req)) {
|
||||||
|
res.status(404).json({ error: "Import job not found" });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
res.json(importJobResponse(job));
|
||||||
|
});
|
||||||
|
|
||||||
|
router.post(COMPANY_IMPORT_ROUTE_PATH, async (req, res) => {
|
||||||
assertBoard(req);
|
assertBoard(req);
|
||||||
assertImportTargetAccess(req, req.body.target);
|
const rawImportBody: unknown = req.body;
|
||||||
const actor = getActorInfo(req);
|
const actor = getActorInfo(req);
|
||||||
const result = await portability.importBundle(req.body, req.actor.type === "board" ? req.actor.userId : null);
|
const boardUserId = req.actor.type === "board" ? req.actor.userId : null;
|
||||||
await logActivity(db, {
|
if (req.header("x-paperclip-cloud-async-import") === "1") {
|
||||||
companyId: result.company.id,
|
assertCloudTenantCaller(req);
|
||||||
actorType: actor.actorType,
|
cleanupTerminalImportJobs(importJobs, importJobTerminalRetentionMs);
|
||||||
actorId: actor.actorId,
|
const job = createImportJob(cloudTenantRequestKey(req));
|
||||||
action: "company.imported",
|
importJobs.set(job.id, job);
|
||||||
entityType: "company",
|
const operation = async () => {
|
||||||
entityId: result.company.id,
|
const importBody = companyPortabilityImportSchema.parse(rawImportBody);
|
||||||
agentId: actor.agentId,
|
assertImportTargetAccess(req, importBody.target);
|
||||||
runId: actor.runId,
|
const activity = importedCompanyActivityContext(actor, importBody.include ?? null);
|
||||||
details: {
|
const result = await portability.importBundle(importBody, boardUserId);
|
||||||
include: req.body.include ?? null,
|
await logImportedCompanyActivity(db, activity, result);
|
||||||
agentCount: result.agents.length,
|
return result;
|
||||||
warningCount: result.warnings.length,
|
};
|
||||||
companyAction: result.company.action,
|
res.status(202).json(importJobAcceptedResponse(job));
|
||||||
},
|
setImmediate(() => {
|
||||||
});
|
void runImportJob(job, operation);
|
||||||
|
});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const importBody = companyPortabilityImportSchema.parse(rawImportBody);
|
||||||
|
assertImportTargetAccess(req, importBody.target);
|
||||||
|
const activity = importedCompanyActivityContext(actor, importBody.include ?? null);
|
||||||
|
const result = await portability.importBundle(importBody, boardUserId);
|
||||||
|
await logImportedCompanyActivity(db, activity, result);
|
||||||
res.json(result);
|
res.json(result);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
@ -419,3 +442,166 @@ export function companyRoutes(db: Db, storage?: StorageService) {
|
||||||
|
|
||||||
return router;
|
return router;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type CompanyImportResult = {
|
||||||
|
company: { id: string; action: unknown };
|
||||||
|
agents: unknown[];
|
||||||
|
warnings: unknown[];
|
||||||
|
};
|
||||||
|
|
||||||
|
interface ImportJobRecord {
|
||||||
|
id: string;
|
||||||
|
cloudTenantKey: string;
|
||||||
|
status: "running" | "succeeded" | "failed";
|
||||||
|
createdAt: string;
|
||||||
|
updatedAt: string;
|
||||||
|
completedAt?: string;
|
||||||
|
error?: { message: string };
|
||||||
|
result?: {
|
||||||
|
companyId: string;
|
||||||
|
agentCount: number;
|
||||||
|
warningCount: number;
|
||||||
|
companyAction: unknown;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
interface ImportedCompanyActivityContext {
|
||||||
|
actorType: "user" | "agent";
|
||||||
|
actorId: string;
|
||||||
|
agentId: string | null;
|
||||||
|
runId: string | null;
|
||||||
|
include: unknown;
|
||||||
|
}
|
||||||
|
|
||||||
|
function assertCloudTenantCaller(req: Request) {
|
||||||
|
if (req.actor.source !== "cloud_tenant") {
|
||||||
|
throw forbidden("Trusted Cloud tenant access required");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function cloudTenantRequestKey(req: Request) {
|
||||||
|
return [
|
||||||
|
req.actor.userId ?? "",
|
||||||
|
req.header("x-paperclip-cloud-stack-id")?.trim() ?? "",
|
||||||
|
req.header("x-paperclip-cloud-paperclip-company-id")?.trim() ?? "",
|
||||||
|
].join(":");
|
||||||
|
}
|
||||||
|
|
||||||
|
function createImportJob(cloudTenantKey: string): ImportJobRecord {
|
||||||
|
const now = new Date().toISOString();
|
||||||
|
return {
|
||||||
|
id: `tenant-import-${randomUUID()}`,
|
||||||
|
cloudTenantKey,
|
||||||
|
status: "running",
|
||||||
|
createdAt: now,
|
||||||
|
updatedAt: now,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
async function runImportJob(
|
||||||
|
job: ImportJobRecord,
|
||||||
|
operation: () => Promise<CompanyImportResult>,
|
||||||
|
) {
|
||||||
|
try {
|
||||||
|
const result = await operation();
|
||||||
|
const now = new Date().toISOString();
|
||||||
|
job.status = "succeeded";
|
||||||
|
job.updatedAt = now;
|
||||||
|
job.completedAt = now;
|
||||||
|
job.result = {
|
||||||
|
companyId: result.company.id,
|
||||||
|
agentCount: result.agents.length,
|
||||||
|
warningCount: result.warnings.length,
|
||||||
|
companyAction: result.company.action,
|
||||||
|
};
|
||||||
|
} catch (error) {
|
||||||
|
const now = new Date().toISOString();
|
||||||
|
job.status = "failed";
|
||||||
|
job.updatedAt = now;
|
||||||
|
job.completedAt = now;
|
||||||
|
job.error = { message: errorMessage(error) };
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function importedCompanyActivityContext(
|
||||||
|
actor: ReturnType<typeof getActorInfo>,
|
||||||
|
include: unknown,
|
||||||
|
): ImportedCompanyActivityContext {
|
||||||
|
return {
|
||||||
|
actorType: actor.actorType,
|
||||||
|
actorId: actor.actorId,
|
||||||
|
agentId: actor.agentId,
|
||||||
|
runId: actor.runId,
|
||||||
|
include,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
async function logImportedCompanyActivity(
|
||||||
|
db: Db,
|
||||||
|
activity: ImportedCompanyActivityContext,
|
||||||
|
result: CompanyImportResult,
|
||||||
|
) {
|
||||||
|
await logActivity(db, {
|
||||||
|
companyId: result.company.id,
|
||||||
|
actorType: activity.actorType,
|
||||||
|
actorId: activity.actorId,
|
||||||
|
action: "company.imported",
|
||||||
|
entityType: "company",
|
||||||
|
entityId: result.company.id,
|
||||||
|
agentId: activity.agentId,
|
||||||
|
runId: activity.runId,
|
||||||
|
details: {
|
||||||
|
include: activity.include,
|
||||||
|
agentCount: result.agents.length,
|
||||||
|
warningCount: result.warnings.length,
|
||||||
|
companyAction: result.company.action,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function importJobAcceptedResponse(job: ImportJobRecord) {
|
||||||
|
return {
|
||||||
|
job: {
|
||||||
|
id: job.id,
|
||||||
|
status: job.status,
|
||||||
|
},
|
||||||
|
statusUrl: `/api/companies/import/jobs/${encodeURIComponent(job.id)}`,
|
||||||
|
retryAfterMs: 1000,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
function importJobResponse(job: ImportJobRecord) {
|
||||||
|
const isTerminal = job.status === "succeeded" || job.status === "failed";
|
||||||
|
const response: Record<string, unknown> = {
|
||||||
|
job: {
|
||||||
|
id: job.id,
|
||||||
|
status: job.status,
|
||||||
|
createdAt: job.createdAt,
|
||||||
|
updatedAt: job.updatedAt,
|
||||||
|
...(job.completedAt ? { completedAt: job.completedAt } : {}),
|
||||||
|
...(job.error ? { error: job.error } : {}),
|
||||||
|
...(job.result ? { result: job.result } : {}),
|
||||||
|
},
|
||||||
|
...(isTerminal ? {} : { retryAfterMs: 1000 }),
|
||||||
|
};
|
||||||
|
if (job.error?.message) {
|
||||||
|
response.error = job.error.message;
|
||||||
|
response.message = job.error.message;
|
||||||
|
response.reason = job.error.message;
|
||||||
|
}
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
|
||||||
|
function cleanupTerminalImportJobs(importJobs: Map<string, ImportJobRecord>, terminalRetentionMs: number) {
|
||||||
|
const now = Date.now();
|
||||||
|
for (const [jobId, job] of importJobs) {
|
||||||
|
if (job.status === "running" || !job.completedAt) continue;
|
||||||
|
if (now - Date.parse(job.completedAt) > terminalRetentionMs) {
|
||||||
|
importJobs.delete(jobId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function errorMessage(error: unknown) {
|
||||||
|
return error instanceof Error && error.message.trim() ? error.message : String(error);
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue