2026-03-20 07:21:38 -05:00
|
|
|
import { randomUUID } from "node:crypto";
|
|
|
|
|
import fs from "node:fs";
|
|
|
|
|
import net from "node:net";
|
|
|
|
|
import os from "node:os";
|
|
|
|
|
import path from "node:path";
|
|
|
|
|
import { eq } from "drizzle-orm";
|
|
|
|
|
import { afterAll, afterEach, beforeAll, describe, expect, it } from "vitest";
|
|
|
|
|
import {
|
|
|
|
|
agents,
|
|
|
|
|
applyPendingMigrations,
|
|
|
|
|
companies,
|
|
|
|
|
createDb,
|
|
|
|
|
ensurePostgresDatabase,
|
|
|
|
|
heartbeatRuns,
|
|
|
|
|
issues,
|
|
|
|
|
projects,
|
|
|
|
|
routineRuns,
|
|
|
|
|
routines,
|
|
|
|
|
} from "@paperclipai/db";
|
|
|
|
|
import { issueService } from "../services/issues.ts";
|
|
|
|
|
import { routineService } from "../services/routines.ts";
|
|
|
|
|
|
|
|
|
|
type EmbeddedPostgresInstance = {
|
|
|
|
|
initialise(): Promise<void>;
|
|
|
|
|
start(): Promise<void>;
|
|
|
|
|
stop(): Promise<void>;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
type EmbeddedPostgresCtor = new (opts: {
|
|
|
|
|
databaseDir: string;
|
|
|
|
|
user: string;
|
|
|
|
|
password: string;
|
|
|
|
|
port: number;
|
|
|
|
|
persistent: boolean;
|
|
|
|
|
initdbFlags?: string[];
|
|
|
|
|
onLog?: (message: unknown) => void;
|
|
|
|
|
onError?: (message: unknown) => void;
|
|
|
|
|
}) => EmbeddedPostgresInstance;
|
|
|
|
|
|
|
|
|
|
async function getEmbeddedPostgresCtor(): Promise<EmbeddedPostgresCtor> {
|
|
|
|
|
const mod = await import("embedded-postgres");
|
|
|
|
|
return mod.default as EmbeddedPostgresCtor;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async function getAvailablePort(): Promise<number> {
|
|
|
|
|
return await new Promise((resolve, reject) => {
|
|
|
|
|
const server = net.createServer();
|
|
|
|
|
server.unref();
|
|
|
|
|
server.on("error", reject);
|
|
|
|
|
server.listen(0, "127.0.0.1", () => {
|
|
|
|
|
const address = server.address();
|
|
|
|
|
if (!address || typeof address === "string") {
|
|
|
|
|
server.close(() => reject(new Error("Failed to allocate test port")));
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
const { port } = address;
|
|
|
|
|
server.close((error) => {
|
|
|
|
|
if (error) reject(error);
|
|
|
|
|
else resolve(port);
|
|
|
|
|
});
|
|
|
|
|
});
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async function startTempDatabase() {
|
|
|
|
|
const dataDir = fs.mkdtempSync(path.join(os.tmpdir(), "paperclip-routines-service-"));
|
|
|
|
|
const port = await getAvailablePort();
|
|
|
|
|
const EmbeddedPostgres = await getEmbeddedPostgresCtor();
|
|
|
|
|
const instance = new EmbeddedPostgres({
|
|
|
|
|
databaseDir: dataDir,
|
|
|
|
|
user: "paperclip",
|
|
|
|
|
password: "paperclip",
|
|
|
|
|
port,
|
|
|
|
|
persistent: true,
|
|
|
|
|
initdbFlags: ["--encoding=UTF8", "--locale=C"],
|
|
|
|
|
onLog: () => {},
|
|
|
|
|
onError: () => {},
|
|
|
|
|
});
|
|
|
|
|
await instance.initialise();
|
|
|
|
|
await instance.start();
|
|
|
|
|
|
|
|
|
|
const adminConnectionString = `postgres://paperclip:paperclip@127.0.0.1:${port}/postgres`;
|
|
|
|
|
await ensurePostgresDatabase(adminConnectionString, "paperclip");
|
|
|
|
|
const connectionString = `postgres://paperclip:paperclip@127.0.0.1:${port}/paperclip`;
|
|
|
|
|
await applyPendingMigrations(connectionString);
|
|
|
|
|
return { connectionString, dataDir, instance };
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
describe("routine service live-execution coalescing", () => {
|
|
|
|
|
let db!: ReturnType<typeof createDb>;
|
|
|
|
|
let instance: EmbeddedPostgresInstance | null = null;
|
|
|
|
|
let dataDir = "";
|
|
|
|
|
|
|
|
|
|
beforeAll(async () => {
|
|
|
|
|
const started = await startTempDatabase();
|
|
|
|
|
db = createDb(started.connectionString);
|
|
|
|
|
instance = started.instance;
|
|
|
|
|
dataDir = started.dataDir;
|
|
|
|
|
}, 20_000);
|
|
|
|
|
|
|
|
|
|
afterEach(async () => {
|
|
|
|
|
await db.delete(routineRuns);
|
|
|
|
|
await db.delete(routines);
|
|
|
|
|
await db.delete(heartbeatRuns);
|
|
|
|
|
await db.delete(issues);
|
|
|
|
|
await db.delete(projects);
|
|
|
|
|
await db.delete(agents);
|
|
|
|
|
await db.delete(companies);
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
afterAll(async () => {
|
|
|
|
|
await instance?.stop();
|
|
|
|
|
if (dataDir) {
|
|
|
|
|
fs.rmSync(dataDir, { recursive: true, force: true });
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
async function seedFixture() {
|
|
|
|
|
const companyId = randomUUID();
|
|
|
|
|
const agentId = randomUUID();
|
|
|
|
|
const projectId = randomUUID();
|
|
|
|
|
const issuePrefix = `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`;
|
2026-03-20 08:11:19 -05:00
|
|
|
const wakeups: Array<{
|
|
|
|
|
agentId: string;
|
|
|
|
|
opts: {
|
|
|
|
|
source?: string;
|
|
|
|
|
triggerDetail?: string;
|
|
|
|
|
reason?: string | null;
|
|
|
|
|
payload?: Record<string, unknown> | null;
|
|
|
|
|
requestedByActorType?: "user" | "agent" | "system";
|
|
|
|
|
requestedByActorId?: string | null;
|
|
|
|
|
contextSnapshot?: Record<string, unknown>;
|
|
|
|
|
};
|
|
|
|
|
}> = [];
|
2026-03-20 07:21:38 -05:00
|
|
|
|
|
|
|
|
await db.insert(companies).values({
|
|
|
|
|
id: companyId,
|
|
|
|
|
name: "Paperclip",
|
|
|
|
|
issuePrefix,
|
|
|
|
|
requireBoardApprovalForNewAgents: false,
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
await db.insert(agents).values({
|
|
|
|
|
id: agentId,
|
|
|
|
|
companyId,
|
|
|
|
|
name: "CodexCoder",
|
|
|
|
|
role: "engineer",
|
|
|
|
|
status: "active",
|
|
|
|
|
adapterType: "codex_local",
|
|
|
|
|
adapterConfig: {},
|
|
|
|
|
runtimeConfig: {},
|
|
|
|
|
permissions: {},
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
await db.insert(projects).values({
|
|
|
|
|
id: projectId,
|
|
|
|
|
companyId,
|
|
|
|
|
name: "Routines",
|
|
|
|
|
status: "in_progress",
|
|
|
|
|
});
|
|
|
|
|
|
2026-03-20 08:11:19 -05:00
|
|
|
const svc = routineService(db, {
|
|
|
|
|
heartbeat: {
|
|
|
|
|
wakeup: async (agentId, opts) => {
|
|
|
|
|
wakeups.push({ agentId, opts });
|
|
|
|
|
return null;
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
});
|
2026-03-20 07:21:38 -05:00
|
|
|
const issueSvc = issueService(db);
|
|
|
|
|
const routine = await svc.create(
|
|
|
|
|
companyId,
|
|
|
|
|
{
|
|
|
|
|
projectId,
|
|
|
|
|
goalId: null,
|
|
|
|
|
parentIssueId: null,
|
|
|
|
|
title: "ascii frog",
|
|
|
|
|
description: "Run the frog routine",
|
|
|
|
|
assigneeAgentId: agentId,
|
|
|
|
|
priority: "medium",
|
|
|
|
|
status: "active",
|
|
|
|
|
concurrencyPolicy: "coalesce_if_active",
|
|
|
|
|
catchUpPolicy: "skip_missed",
|
|
|
|
|
},
|
|
|
|
|
{},
|
|
|
|
|
);
|
|
|
|
|
|
2026-03-20 08:11:19 -05:00
|
|
|
return { companyId, agentId, issueSvc, projectId, routine, svc, wakeups };
|
2026-03-20 07:21:38 -05:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
it("creates a fresh execution issue when the previous routine issue is open but idle", async () => {
|
|
|
|
|
const { companyId, issueSvc, routine, svc } = await seedFixture();
|
|
|
|
|
const previousRunId = randomUUID();
|
|
|
|
|
const previousIssue = await issueSvc.create(companyId, {
|
|
|
|
|
projectId: routine.projectId,
|
|
|
|
|
title: routine.title,
|
|
|
|
|
description: routine.description,
|
|
|
|
|
status: "todo",
|
|
|
|
|
priority: routine.priority,
|
|
|
|
|
assigneeAgentId: routine.assigneeAgentId,
|
|
|
|
|
originKind: "routine_execution",
|
|
|
|
|
originId: routine.id,
|
|
|
|
|
originRunId: previousRunId,
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
await db.insert(routineRuns).values({
|
|
|
|
|
id: previousRunId,
|
|
|
|
|
companyId,
|
|
|
|
|
routineId: routine.id,
|
|
|
|
|
triggerId: null,
|
|
|
|
|
source: "manual",
|
|
|
|
|
status: "issue_created",
|
|
|
|
|
triggeredAt: new Date("2026-03-20T12:00:00.000Z"),
|
|
|
|
|
linkedIssueId: previousIssue.id,
|
|
|
|
|
completedAt: new Date("2026-03-20T12:00:00.000Z"),
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
const detailBefore = await svc.getDetail(routine.id);
|
|
|
|
|
expect(detailBefore?.activeIssue).toBeNull();
|
|
|
|
|
|
|
|
|
|
const run = await svc.runRoutine(routine.id, { source: "manual" });
|
|
|
|
|
expect(run.status).toBe("issue_created");
|
|
|
|
|
expect(run.linkedIssueId).not.toBe(previousIssue.id);
|
|
|
|
|
|
|
|
|
|
const routineIssues = await db
|
|
|
|
|
.select({
|
|
|
|
|
id: issues.id,
|
|
|
|
|
originRunId: issues.originRunId,
|
|
|
|
|
})
|
|
|
|
|
.from(issues)
|
|
|
|
|
.where(eq(issues.originId, routine.id));
|
|
|
|
|
|
|
|
|
|
expect(routineIssues).toHaveLength(2);
|
|
|
|
|
expect(routineIssues.map((issue) => issue.id)).toContain(previousIssue.id);
|
|
|
|
|
expect(routineIssues.map((issue) => issue.id)).toContain(run.linkedIssueId);
|
|
|
|
|
});
|
|
|
|
|
|
2026-03-20 08:11:19 -05:00
|
|
|
it("wakes the assignee when a routine creates a fresh execution issue", async () => {
|
|
|
|
|
const { agentId, routine, svc, wakeups } = await seedFixture();
|
|
|
|
|
|
|
|
|
|
const run = await svc.runRoutine(routine.id, { source: "manual" });
|
|
|
|
|
|
|
|
|
|
expect(run.status).toBe("issue_created");
|
|
|
|
|
expect(run.linkedIssueId).toBeTruthy();
|
|
|
|
|
expect(wakeups).toEqual([
|
|
|
|
|
{
|
|
|
|
|
agentId,
|
|
|
|
|
opts: {
|
|
|
|
|
source: "assignment",
|
|
|
|
|
triggerDetail: "system",
|
|
|
|
|
reason: "issue_assigned",
|
|
|
|
|
payload: { issueId: run.linkedIssueId, mutation: "create" },
|
|
|
|
|
requestedByActorType: undefined,
|
|
|
|
|
requestedByActorId: null,
|
|
|
|
|
contextSnapshot: { issueId: run.linkedIssueId, source: "routine.dispatch" },
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
]);
|
|
|
|
|
});
|
|
|
|
|
|
2026-03-20 07:21:38 -05:00
|
|
|
it("coalesces only when the existing routine issue has a live execution run", async () => {
|
|
|
|
|
const { agentId, companyId, issueSvc, routine, svc } = await seedFixture();
|
|
|
|
|
const previousRunId = randomUUID();
|
|
|
|
|
const liveHeartbeatRunId = randomUUID();
|
|
|
|
|
const previousIssue = await issueSvc.create(companyId, {
|
|
|
|
|
projectId: routine.projectId,
|
|
|
|
|
title: routine.title,
|
|
|
|
|
description: routine.description,
|
|
|
|
|
status: "in_progress",
|
|
|
|
|
priority: routine.priority,
|
|
|
|
|
assigneeAgentId: routine.assigneeAgentId,
|
|
|
|
|
originKind: "routine_execution",
|
|
|
|
|
originId: routine.id,
|
|
|
|
|
originRunId: previousRunId,
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
await db.insert(routineRuns).values({
|
|
|
|
|
id: previousRunId,
|
|
|
|
|
companyId,
|
|
|
|
|
routineId: routine.id,
|
|
|
|
|
triggerId: null,
|
|
|
|
|
source: "manual",
|
|
|
|
|
status: "issue_created",
|
|
|
|
|
triggeredAt: new Date("2026-03-20T12:00:00.000Z"),
|
|
|
|
|
linkedIssueId: previousIssue.id,
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
await db.insert(heartbeatRuns).values({
|
|
|
|
|
id: liveHeartbeatRunId,
|
|
|
|
|
companyId,
|
|
|
|
|
agentId,
|
|
|
|
|
invocationSource: "assignment",
|
|
|
|
|
triggerDetail: "system",
|
|
|
|
|
status: "running",
|
|
|
|
|
contextSnapshot: { issueId: previousIssue.id },
|
|
|
|
|
startedAt: new Date("2026-03-20T12:01:00.000Z"),
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
await db
|
|
|
|
|
.update(issues)
|
|
|
|
|
.set({
|
|
|
|
|
checkoutRunId: liveHeartbeatRunId,
|
|
|
|
|
executionRunId: liveHeartbeatRunId,
|
|
|
|
|
executionLockedAt: new Date("2026-03-20T12:01:00.000Z"),
|
|
|
|
|
})
|
|
|
|
|
.where(eq(issues.id, previousIssue.id));
|
|
|
|
|
|
|
|
|
|
const detailBefore = await svc.getDetail(routine.id);
|
|
|
|
|
expect(detailBefore?.activeIssue?.id).toBe(previousIssue.id);
|
|
|
|
|
|
|
|
|
|
const run = await svc.runRoutine(routine.id, { source: "manual" });
|
|
|
|
|
expect(run.status).toBe("coalesced");
|
|
|
|
|
expect(run.linkedIssueId).toBe(previousIssue.id);
|
|
|
|
|
expect(run.coalescedIntoRunId).toBe(previousRunId);
|
|
|
|
|
|
|
|
|
|
const routineIssues = await db
|
|
|
|
|
.select({ id: issues.id })
|
|
|
|
|
.from(issues)
|
|
|
|
|
.where(eq(issues.originId, routine.id));
|
|
|
|
|
|
|
|
|
|
expect(routineIssues).toHaveLength(1);
|
|
|
|
|
expect(routineIssues[0]?.id).toBe(previousIssue.id);
|
|
|
|
|
});
|
|
|
|
|
});
|