mirror of
https://github.com/alkimake/paperclip.git
synced 2026-06-14 01:50:39 +09:00
Add local environment lifecycle (#4297)
## Thinking Path > - Paperclip orchestrates AI agents for zero-human companies. > - Every heartbeat run needs a concrete place where the agent's adapter process executes. > - Today that execution location is implicitly the local machine, which makes it hard to track, audit, and manage as a first-class runtime concern. > - The first step is to represent the current local execution path explicitly without changing how users experience agent runs. > - This pull request adds core Environment and Environment Lease records, then routes existing local heartbeat execution through a default `Local` environment. > - The benefit is that local runs remain behavior-preserving while the system now has durable environment identity, lease lifecycle tracking, and activity records for execution placement. ## What Changed - Added `environments` and `environment_leases` database tables, schema exports, and migration `0065_environments.sql`. - Added shared environment constants, TypeScript types, and validators for environment drivers, statuses, lease policies, lease statuses, and cleanup states. - Added `environmentService` for listing, reading, creating, updating, and ensuring company-scoped environments. - Added environment lease lifecycle operations for acquire, metadata update, single-lease release, and run-wide release. - Updated heartbeat execution to lazily ensure a company-scoped default `Local` environment before adapter execution. - Updated heartbeat execution to acquire an ephemeral local environment lease, write `paperclipEnvironment` into the run context snapshot, and release active leases during run finalization. - Added activity log events for environment lease acquisition and release. - Added tests for environment service behavior and the local heartbeat environment lifecycle. - Added a CI-follow-up heartbeat guard so deferred issue comment wakes are promoted before automatic missing-comment retries, with focused batching test coverage. ## Verification Local verification run for this branch: - `pnpm -r typecheck` - `pnpm build` - `pnpm exec vitest run server/src/__tests__/environment-service.test.ts server/src/__tests__/heartbeat-local-environment.test.ts --pool=forks` Additional reviewer/CI verification: - Confirm `pnpm-lock.yaml` is not modified. - Confirm `pnpm test:run` passes in CI. - Confirm `PAPERCLIP_E2E_SKIP_LLM=true pnpm run test:e2e` passes in CI. - Confirm a local heartbeat run creates one active `Local` environment when needed, records one lease for the run, releases the lease when the run finishes, and includes `paperclipEnvironment` in the run context snapshot. Screenshots: not applicable; this PR has no UI changes. ## Risks - Migration risk: introduces two new tables and a new migration journal entry. Review should verify company scoping, indexes, foreign keys, and enum defaults are correct. - Lifecycle risk: heartbeat finalization now releases environment leases in addition to existing runtime cleanup. A finalization bug could leave stale active leases or mark a failed run's lease incorrectly. - Behavior-preservation risk: local adapter execution should remain unchanged apart from environment bookkeeping. Review should pay attention to the heartbeat path around context snapshot updates and final cleanup ordering. - Activity volume risk: each heartbeat run now logs lease acquisition and release events, increasing activity log volume by two records per run. ## Model Used OpenAI GPT-5.4 via Codex CLI. Capabilities used: repository inspection, TypeScript implementation review, local test/build execution, and PR-description drafting. ## Checklist - [x] I have included a thinking path that traces from project context to this change - [x] I have run tests locally and they pass - [x] I have added or updated tests where applicable - [x] If this change affects the UI, I have included before/after screenshots (N/A: no UI changes) - [x] I have updated relevant documentation to reflect my changes (N/A: no user-facing docs or commands changed) - [x] I have considered and documented any risks above - [x] I will address all Greptile and reviewer comments before requesting merge
This commit is contained in:
parent
b69b563aa8
commit
13551b2bac
17 changed files with 1098 additions and 2 deletions
224
server/src/__tests__/environment-service.test.ts
Normal file
224
server/src/__tests__/environment-service.test.ts
Normal file
|
|
@ -0,0 +1,224 @@
|
|||
import { randomUUID } from "node:crypto";
|
||||
import { afterAll, afterEach, beforeAll, describe, expect, it } from "vitest";
|
||||
import { eq } from "drizzle-orm";
|
||||
import { agents, companies, createDb, environmentLeases, environments, heartbeatRuns } from "@paperclipai/db";
|
||||
import {
|
||||
getEmbeddedPostgresTestSupport,
|
||||
startEmbeddedPostgresTestDatabase,
|
||||
} from "./helpers/embedded-postgres.js";
|
||||
import { environmentService } from "../services/environments.ts";
|
||||
|
||||
const embeddedPostgresSupport = await getEmbeddedPostgresTestSupport();
|
||||
const describeEmbeddedPostgres = embeddedPostgresSupport.supported ? describe : describe.skip;
|
||||
|
||||
if (!embeddedPostgresSupport.supported) {
|
||||
console.warn(
|
||||
`Skipping embedded Postgres environment service tests on this host: ${embeddedPostgresSupport.reason ?? "unsupported environment"}`,
|
||||
);
|
||||
}
|
||||
|
||||
describeEmbeddedPostgres("environmentService leases", () => {
|
||||
let stopDb: (() => Promise<void>) | null = null;
|
||||
let db!: ReturnType<typeof createDb>;
|
||||
let svc!: ReturnType<typeof environmentService>;
|
||||
|
||||
beforeAll(async () => {
|
||||
const started = await startEmbeddedPostgresTestDatabase("environment-service");
|
||||
stopDb = started.stop;
|
||||
db = createDb(started.connectionString);
|
||||
svc = environmentService(db);
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await db.delete(environmentLeases);
|
||||
await db.delete(heartbeatRuns);
|
||||
await db.delete(agents);
|
||||
await db.delete(environments);
|
||||
await db.delete(companies);
|
||||
});
|
||||
|
||||
afterAll(async () => {
|
||||
await stopDb?.();
|
||||
});
|
||||
|
||||
async function seedEnvironment() {
|
||||
const companyId = randomUUID();
|
||||
const agentId = randomUUID();
|
||||
const environmentId = randomUUID();
|
||||
const runId = randomUUID();
|
||||
|
||||
await db.insert(companies).values({
|
||||
id: companyId,
|
||||
name: "Acme",
|
||||
status: "active",
|
||||
createdAt: new Date(),
|
||||
updatedAt: new Date(),
|
||||
});
|
||||
await db.insert(agents).values({
|
||||
id: agentId,
|
||||
companyId,
|
||||
name: "CodexCoder",
|
||||
role: "engineer",
|
||||
status: "active",
|
||||
adapterType: "codex_local",
|
||||
adapterConfig: {},
|
||||
runtimeConfig: {},
|
||||
permissions: {},
|
||||
createdAt: new Date(),
|
||||
updatedAt: new Date(),
|
||||
});
|
||||
await db.insert(environments).values({
|
||||
id: environmentId,
|
||||
companyId,
|
||||
name: "Local",
|
||||
driver: "local",
|
||||
status: "active",
|
||||
config: {},
|
||||
createdAt: new Date(),
|
||||
updatedAt: new Date(),
|
||||
});
|
||||
await db.insert(heartbeatRuns).values({
|
||||
id: runId,
|
||||
companyId,
|
||||
agentId,
|
||||
invocationSource: "manual",
|
||||
status: "running",
|
||||
createdAt: new Date(),
|
||||
updatedAt: new Date(),
|
||||
});
|
||||
|
||||
return { companyId, agentId, environmentId, runId };
|
||||
}
|
||||
|
||||
it("acquires and releases a lease for a run", async () => {
|
||||
const { companyId, environmentId, runId } = await seedEnvironment();
|
||||
|
||||
const lease = await svc.acquireLease({
|
||||
companyId,
|
||||
environmentId,
|
||||
heartbeatRunId: runId,
|
||||
metadata: { driver: "local" },
|
||||
});
|
||||
|
||||
expect(lease.status).toBe("active");
|
||||
expect(lease.heartbeatRunId).toBe(runId);
|
||||
|
||||
const released = await svc.releaseLease(lease.id);
|
||||
|
||||
expect(released?.status).toBe("released");
|
||||
expect(released?.releasedAt).not.toBeNull();
|
||||
});
|
||||
|
||||
it("releases all active leases for a run without touching unrelated rows", async () => {
|
||||
const { companyId, agentId, environmentId, runId } = await seedEnvironment();
|
||||
const otherRunId = randomUUID();
|
||||
|
||||
await db.insert(heartbeatRuns).values({
|
||||
id: otherRunId,
|
||||
companyId,
|
||||
agentId,
|
||||
invocationSource: "manual",
|
||||
status: "running",
|
||||
createdAt: new Date(),
|
||||
updatedAt: new Date(),
|
||||
});
|
||||
|
||||
const targetLease = await svc.acquireLease({
|
||||
companyId,
|
||||
environmentId,
|
||||
heartbeatRunId: runId,
|
||||
});
|
||||
const otherLease = await svc.acquireLease({
|
||||
companyId,
|
||||
environmentId,
|
||||
heartbeatRunId: otherRunId,
|
||||
});
|
||||
|
||||
const released = await svc.releaseLeasesForRun(runId);
|
||||
|
||||
expect(released.map((lease) => lease.id)).toEqual([targetLease.id]);
|
||||
|
||||
const stillActive = await svc.listLeases(environmentId, { status: "active" });
|
||||
expect(stillActive.map((lease) => lease.id)).toEqual([otherLease.id]);
|
||||
});
|
||||
|
||||
it("creates and then reuses the default local environment for a company", async () => {
|
||||
const companyId = randomUUID();
|
||||
await db.insert(companies).values({
|
||||
id: companyId,
|
||||
name: "Acme",
|
||||
status: "active",
|
||||
createdAt: new Date(),
|
||||
updatedAt: new Date(),
|
||||
});
|
||||
|
||||
const created = await svc.ensureLocalEnvironment(companyId);
|
||||
const reused = await svc.ensureLocalEnvironment(companyId);
|
||||
|
||||
expect(created.driver).toBe("local");
|
||||
expect(reused.id).toBe(created.id);
|
||||
|
||||
const rows = await db.select().from(environments).where(eq(environments.companyId, companyId));
|
||||
expect(rows).toHaveLength(1);
|
||||
expect(rows[0]?.name).toBe("Local");
|
||||
});
|
||||
|
||||
it("leaves an existing default local environment untouched", async () => {
|
||||
const companyId = randomUUID();
|
||||
await db.insert(companies).values({
|
||||
id: companyId,
|
||||
name: "Acme",
|
||||
status: "active",
|
||||
createdAt: new Date(),
|
||||
updatedAt: new Date(),
|
||||
});
|
||||
const archivedAt = new Date("2025-01-01T00:00:00.000Z");
|
||||
const [existing] = await db
|
||||
.insert(environments)
|
||||
.values({
|
||||
companyId,
|
||||
name: "Archived Local",
|
||||
description: "Operator-managed local environment",
|
||||
driver: "local",
|
||||
status: "archived",
|
||||
config: { shell: "zsh" },
|
||||
metadata: { owner: "operator" },
|
||||
createdAt: archivedAt,
|
||||
updatedAt: archivedAt,
|
||||
})
|
||||
.returning();
|
||||
|
||||
const ensured = await svc.ensureLocalEnvironment(companyId);
|
||||
|
||||
expect(ensured.id).toBe(existing?.id);
|
||||
expect(ensured.name).toBe("Archived Local");
|
||||
expect(ensured.status).toBe("archived");
|
||||
expect(ensured.metadata).toEqual({ owner: "operator" });
|
||||
|
||||
const rows = await db.select().from(environments).where(eq(environments.companyId, companyId));
|
||||
expect(rows).toHaveLength(1);
|
||||
expect(rows[0]?.updatedAt.toISOString()).toBe(archivedAt.toISOString());
|
||||
});
|
||||
|
||||
it("deduplicates concurrent default local environment creation", async () => {
|
||||
const companyId = randomUUID();
|
||||
await db.insert(companies).values({
|
||||
id: companyId,
|
||||
name: "Acme",
|
||||
status: "active",
|
||||
createdAt: new Date(),
|
||||
updatedAt: new Date(),
|
||||
});
|
||||
|
||||
const results = await Promise.all(
|
||||
Array.from({ length: 8 }, () => svc.ensureLocalEnvironment(companyId)),
|
||||
);
|
||||
|
||||
expect(new Set(results.map((environment) => environment.id)).size).toBe(1);
|
||||
|
||||
const rows = await db.select().from(environments).where(eq(environments.companyId, companyId));
|
||||
expect(rows).toHaveLength(1);
|
||||
expect(rows[0]?.driver).toBe("local");
|
||||
expect(rows[0]?.status).toBe("active");
|
||||
});
|
||||
});
|
||||
|
|
@ -1150,7 +1150,6 @@ describe("heartbeat comment wake batching", () => {
|
|||
|
||||
gateway.releaseFirstWait();
|
||||
|
||||
await waitFor(() => gateway.getAgentPayloads().length === 2, 90_000);
|
||||
await waitFor(async () => {
|
||||
const runs = await db
|
||||
.select()
|
||||
|
|
@ -1159,6 +1158,7 @@ describe("heartbeat comment wake batching", () => {
|
|||
.orderBy(asc(heartbeatRuns.createdAt));
|
||||
return runs.length === 1 && runs[0]?.status === "succeeded";
|
||||
}, 90_000);
|
||||
expect(gateway.getAgentPayloads().length).toBeGreaterThanOrEqual(2);
|
||||
|
||||
const mentionedRuns = await db
|
||||
.select()
|
||||
|
|
@ -1171,6 +1171,28 @@ describe("heartbeat comment wake batching", () => {
|
|||
issueId,
|
||||
wakeReason: "issue_comment_mentioned",
|
||||
});
|
||||
|
||||
const primaryRuns = await db
|
||||
.select()
|
||||
.from(heartbeatRuns)
|
||||
.where(eq(heartbeatRuns.agentId, primaryAgentId))
|
||||
.orderBy(asc(heartbeatRuns.createdAt));
|
||||
expect(primaryRuns).toHaveLength(2);
|
||||
expect(primaryRuns[0]?.issueCommentStatus).toBe("retry_queued");
|
||||
expect(primaryRuns[1]?.retryOfRunId).toBe(primaryRuns[0]?.id);
|
||||
expect(primaryRuns[1]?.issueCommentStatus).toBe("retry_exhausted");
|
||||
|
||||
const missingCommentRetries = await db
|
||||
.select()
|
||||
.from(agentWakeupRequests)
|
||||
.where(
|
||||
and(
|
||||
eq(agentWakeupRequests.companyId, companyId),
|
||||
eq(agentWakeupRequests.agentId, primaryAgentId),
|
||||
eq(agentWakeupRequests.reason, "missing_issue_comment"),
|
||||
),
|
||||
);
|
||||
expect(missingCommentRetries).toHaveLength(1);
|
||||
} finally {
|
||||
gateway.releaseFirstWait();
|
||||
await gateway.close();
|
||||
|
|
|
|||
148
server/src/__tests__/heartbeat-local-environment.test.ts
Normal file
148
server/src/__tests__/heartbeat-local-environment.test.ts
Normal file
|
|
@ -0,0 +1,148 @@
|
|||
import { randomUUID } from "node:crypto";
|
||||
import { and, eq } from "drizzle-orm";
|
||||
import { afterAll, afterEach, beforeAll, describe, expect, it } from "vitest";
|
||||
import {
|
||||
agents,
|
||||
agentRuntimeState,
|
||||
agentWakeupRequests,
|
||||
activityLog,
|
||||
companies,
|
||||
companySkills,
|
||||
createDb,
|
||||
environmentLeases,
|
||||
environments,
|
||||
heartbeatRunEvents,
|
||||
heartbeatRuns,
|
||||
} from "@paperclipai/db";
|
||||
import {
|
||||
getEmbeddedPostgresTestSupport,
|
||||
startEmbeddedPostgresTestDatabase,
|
||||
} from "./helpers/embedded-postgres.js";
|
||||
import { heartbeatService } from "../services/heartbeat.ts";
|
||||
|
||||
const embeddedPostgresSupport = await getEmbeddedPostgresTestSupport();
|
||||
const describeEmbeddedPostgres = embeddedPostgresSupport.supported ? describe : describe.skip;
|
||||
|
||||
if (!embeddedPostgresSupport.supported) {
|
||||
console.warn(
|
||||
`Skipping embedded Postgres heartbeat environment tests on this host: ${embeddedPostgresSupport.reason ?? "unsupported environment"}`,
|
||||
);
|
||||
}
|
||||
|
||||
async function waitForRunToFinish(
|
||||
heartbeat: ReturnType<typeof heartbeatService>,
|
||||
runId: string,
|
||||
timeoutMs = 5_000,
|
||||
) {
|
||||
const deadline = Date.now() + timeoutMs;
|
||||
while (Date.now() < deadline) {
|
||||
const run = await heartbeat.getRun(runId);
|
||||
if (run && !["queued", "running"].includes(run.status)) return run;
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
}
|
||||
return await heartbeat.getRun(runId);
|
||||
}
|
||||
|
||||
async function waitForRunLeasesToRelease(
|
||||
db: ReturnType<typeof createDb>,
|
||||
runId: string,
|
||||
timeoutMs = 5_000,
|
||||
) {
|
||||
const deadline = Date.now() + timeoutMs;
|
||||
while (Date.now() < deadline) {
|
||||
const leases = await db
|
||||
.select()
|
||||
.from(environmentLeases)
|
||||
.where(eq(environmentLeases.heartbeatRunId, runId));
|
||||
if (leases.length > 0 && leases.every((lease) => lease.status !== "active")) return leases;
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
}
|
||||
return await db
|
||||
.select()
|
||||
.from(environmentLeases)
|
||||
.where(eq(environmentLeases.heartbeatRunId, runId));
|
||||
}
|
||||
|
||||
describeEmbeddedPostgres("heartbeat local environment lifecycle", () => {
|
||||
let db!: ReturnType<typeof createDb>;
|
||||
let tempDb: Awaited<ReturnType<typeof startEmbeddedPostgresTestDatabase>> | null = null;
|
||||
|
||||
beforeAll(async () => {
|
||||
tempDb = await startEmbeddedPostgresTestDatabase("heartbeat-local-environment-");
|
||||
db = createDb(tempDb.connectionString);
|
||||
}, 20_000);
|
||||
|
||||
afterEach(async () => {
|
||||
await db.delete(environmentLeases);
|
||||
await db.delete(environments);
|
||||
await db.delete(activityLog);
|
||||
await db.delete(heartbeatRunEvents);
|
||||
await db.delete(heartbeatRuns);
|
||||
await db.delete(agentWakeupRequests);
|
||||
await db.delete(agentRuntimeState);
|
||||
await db.delete(companySkills);
|
||||
await db.delete(agents);
|
||||
await db.delete(companies);
|
||||
});
|
||||
|
||||
afterAll(async () => {
|
||||
await tempDb?.cleanup();
|
||||
});
|
||||
|
||||
it("runs work through the default Local environment lease", async () => {
|
||||
const companyId = randomUUID();
|
||||
const agentId = randomUUID();
|
||||
const issuePrefix = `T${companyId.replace(/-/g, "").slice(0, 6).toUpperCase()}`;
|
||||
|
||||
await db.insert(companies).values({
|
||||
id: companyId,
|
||||
name: "Paperclip",
|
||||
issuePrefix,
|
||||
requireBoardApprovalForNewAgents: false,
|
||||
});
|
||||
|
||||
await db.insert(agents).values({
|
||||
id: agentId,
|
||||
companyId,
|
||||
name: "ProcessAgent",
|
||||
role: "engineer",
|
||||
status: "idle",
|
||||
adapterType: "process",
|
||||
adapterConfig: {
|
||||
command: process.execPath,
|
||||
args: ["-e", "process.exit(0)"],
|
||||
},
|
||||
runtimeConfig: {},
|
||||
permissions: {},
|
||||
});
|
||||
|
||||
const heartbeat = heartbeatService(db);
|
||||
const queued = await heartbeat.invoke(agentId, "on_demand", {}, "manual");
|
||||
expect(queued).not.toBeNull();
|
||||
|
||||
const finished = await waitForRunToFinish(heartbeat, queued!.id);
|
||||
expect(finished?.status).toBe("succeeded");
|
||||
|
||||
const localRows = await db
|
||||
.select()
|
||||
.from(environments)
|
||||
.where(and(eq(environments.companyId, companyId), eq(environments.driver, "local")));
|
||||
expect(localRows).toHaveLength(1);
|
||||
expect(localRows[0]?.name).toBe("Local");
|
||||
|
||||
const leases = await waitForRunLeasesToRelease(db, queued!.id);
|
||||
expect(leases).toHaveLength(1);
|
||||
expect(leases[0]?.environmentId).toBe(localRows[0]?.id);
|
||||
expect(leases[0]?.status).toBe("released");
|
||||
expect(leases[0]?.provider).toBe("local");
|
||||
expect(leases[0]?.releasedAt).not.toBeNull();
|
||||
|
||||
const context = finished?.contextSnapshot as Record<string, unknown>;
|
||||
expect(context.paperclipEnvironment).toMatchObject({
|
||||
id: localRows[0]?.id,
|
||||
name: "Local",
|
||||
driver: "local",
|
||||
leaseId: leases[0]?.id,
|
||||
});
|
||||
});
|
||||
});
|
||||
316
server/src/services/environments.ts
Normal file
316
server/src/services/environments.ts
Normal file
|
|
@ -0,0 +1,316 @@
|
|||
import { and, desc, eq } from "drizzle-orm";
|
||||
import type { Db } from "@paperclipai/db";
|
||||
import { environmentLeases, environments } from "@paperclipai/db";
|
||||
import {
|
||||
ENVIRONMENT_DRIVERS,
|
||||
ENVIRONMENT_LEASE_CLEANUP_STATUSES,
|
||||
ENVIRONMENT_LEASE_POLICIES,
|
||||
ENVIRONMENT_LEASE_STATUSES,
|
||||
ENVIRONMENT_STATUSES,
|
||||
type CreateEnvironment,
|
||||
type Environment,
|
||||
type EnvironmentLease,
|
||||
type EnvironmentLeaseCleanupStatus,
|
||||
type EnvironmentLeasePolicy,
|
||||
type EnvironmentLeaseStatus,
|
||||
type UpdateEnvironment,
|
||||
} from "@paperclipai/shared";
|
||||
|
||||
type EnvironmentRow = typeof environments.$inferSelect;
|
||||
type EnvironmentLeaseRow = typeof environmentLeases.$inferSelect;
|
||||
const DEFAULT_LOCAL_ENVIRONMENT_NAME = "Local";
|
||||
const DEFAULT_LOCAL_ENVIRONMENT_DESCRIPTION =
|
||||
"Default execution environment for Paperclip runs on this machine.";
|
||||
|
||||
function cloneRecord(value: unknown, fallback: Record<string, unknown> | null = null): Record<string, unknown> | null {
|
||||
if (!value || typeof value !== "object" || Array.isArray(value)) return fallback;
|
||||
return { ...(value as Record<string, unknown>) };
|
||||
}
|
||||
|
||||
function readEnum<T extends string>(value: string | null, allowed: readonly T[], fieldName: string): T | null {
|
||||
if (value === null) return null;
|
||||
if ((allowed as readonly string[]).includes(value)) return value as T;
|
||||
throw new Error(`Unexpected ${fieldName} value: ${value}`);
|
||||
}
|
||||
|
||||
function toEnvironment(row: EnvironmentRow): Environment {
|
||||
return {
|
||||
id: row.id,
|
||||
companyId: row.companyId,
|
||||
name: row.name,
|
||||
description: row.description ?? null,
|
||||
driver: readEnum(row.driver, ENVIRONMENT_DRIVERS, "environment driver") ?? "local",
|
||||
status: readEnum(row.status, ENVIRONMENT_STATUSES, "environment status") ?? "active",
|
||||
config: cloneRecord(row.config, {}) ?? {},
|
||||
metadata: cloneRecord(row.metadata),
|
||||
createdAt: row.createdAt,
|
||||
updatedAt: row.updatedAt,
|
||||
};
|
||||
}
|
||||
|
||||
function toEnvironmentLease(row: EnvironmentLeaseRow): EnvironmentLease {
|
||||
return {
|
||||
id: row.id,
|
||||
companyId: row.companyId,
|
||||
environmentId: row.environmentId,
|
||||
executionWorkspaceId: row.executionWorkspaceId ?? null,
|
||||
issueId: row.issueId ?? null,
|
||||
heartbeatRunId: row.heartbeatRunId ?? null,
|
||||
status: readEnum(row.status, ENVIRONMENT_LEASE_STATUSES, "environment lease status") ?? "active",
|
||||
leasePolicy: readEnum(row.leasePolicy, ENVIRONMENT_LEASE_POLICIES, "environment lease policy") ?? "ephemeral",
|
||||
provider: row.provider ?? null,
|
||||
providerLeaseId: row.providerLeaseId ?? null,
|
||||
acquiredAt: row.acquiredAt,
|
||||
lastUsedAt: row.lastUsedAt,
|
||||
expiresAt: row.expiresAt ?? null,
|
||||
releasedAt: row.releasedAt ?? null,
|
||||
failureReason: row.failureReason ?? null,
|
||||
cleanupStatus: readEnum(
|
||||
row.cleanupStatus,
|
||||
ENVIRONMENT_LEASE_CLEANUP_STATUSES,
|
||||
"environment lease cleanup status",
|
||||
),
|
||||
metadata: cloneRecord(row.metadata),
|
||||
createdAt: row.createdAt,
|
||||
updatedAt: row.updatedAt,
|
||||
};
|
||||
}
|
||||
|
||||
export function environmentService(db: Db) {
|
||||
return {
|
||||
list: async (
|
||||
companyId: string,
|
||||
filters: {
|
||||
status?: string;
|
||||
driver?: string;
|
||||
} = {},
|
||||
): Promise<Environment[]> => {
|
||||
const conditions = [eq(environments.companyId, companyId)];
|
||||
if (filters.status) conditions.push(eq(environments.status, filters.status));
|
||||
if (filters.driver) conditions.push(eq(environments.driver, filters.driver));
|
||||
const rows = await db
|
||||
.select()
|
||||
.from(environments)
|
||||
.where(and(...conditions))
|
||||
.orderBy(desc(environments.updatedAt), desc(environments.createdAt));
|
||||
return rows.map(toEnvironment);
|
||||
},
|
||||
|
||||
getById: async (id: string): Promise<Environment | null> => {
|
||||
const row = await db.select().from(environments).where(eq(environments.id, id)).then((rows) => rows[0] ?? null);
|
||||
return row ? toEnvironment(row) : null;
|
||||
},
|
||||
|
||||
getLeaseById: async (id: string): Promise<EnvironmentLease | null> => {
|
||||
const row = await db
|
||||
.select()
|
||||
.from(environmentLeases)
|
||||
.where(eq(environmentLeases.id, id))
|
||||
.then((rows) => rows[0] ?? null);
|
||||
return row ? toEnvironmentLease(row) : null;
|
||||
},
|
||||
|
||||
ensureLocalEnvironment: async (companyId: string): Promise<Environment> => {
|
||||
const now = new Date();
|
||||
const row = await db
|
||||
.insert(environments)
|
||||
.values({
|
||||
companyId,
|
||||
name: DEFAULT_LOCAL_ENVIRONMENT_NAME,
|
||||
description: DEFAULT_LOCAL_ENVIRONMENT_DESCRIPTION,
|
||||
driver: "local",
|
||||
status: "active",
|
||||
config: {},
|
||||
metadata: {
|
||||
managedByPaperclip: true,
|
||||
defaultForCompany: true,
|
||||
},
|
||||
createdAt: now,
|
||||
updatedAt: now,
|
||||
})
|
||||
.onConflictDoNothing({
|
||||
target: [environments.companyId, environments.driver],
|
||||
})
|
||||
.returning()
|
||||
.then((rows) => rows[0] ?? null);
|
||||
if (row) return toEnvironment(row);
|
||||
|
||||
const existing = await db
|
||||
.select()
|
||||
.from(environments)
|
||||
.where(and(eq(environments.companyId, companyId), eq(environments.driver, "local")))
|
||||
.then((rows) => rows[0] ?? null);
|
||||
if (!existing) {
|
||||
throw new Error("Failed to ensure local environment");
|
||||
}
|
||||
return toEnvironment(existing);
|
||||
},
|
||||
|
||||
create: async (companyId: string, input: CreateEnvironment): Promise<Environment> => {
|
||||
const now = new Date();
|
||||
const row = await db
|
||||
.insert(environments)
|
||||
.values({
|
||||
companyId,
|
||||
name: input.name,
|
||||
description: input.description ?? null,
|
||||
driver: input.driver,
|
||||
status: input.status ?? "active",
|
||||
config: input.config ?? {},
|
||||
metadata: input.metadata ?? null,
|
||||
createdAt: now,
|
||||
updatedAt: now,
|
||||
})
|
||||
.returning()
|
||||
.then((rows) => rows[0] ?? null);
|
||||
if (!row) {
|
||||
throw new Error("Failed to create environment");
|
||||
}
|
||||
return toEnvironment(row);
|
||||
},
|
||||
|
||||
update: async (id: string, patch: UpdateEnvironment): Promise<Environment | null> => {
|
||||
const values: Partial<typeof environments.$inferInsert> = {
|
||||
updatedAt: new Date(),
|
||||
};
|
||||
if (patch.name !== undefined) values.name = patch.name;
|
||||
if (patch.description !== undefined) values.description = patch.description ?? null;
|
||||
if (patch.driver !== undefined) values.driver = patch.driver;
|
||||
if (patch.status !== undefined) values.status = patch.status;
|
||||
if (patch.config !== undefined) values.config = patch.config;
|
||||
if (patch.metadata !== undefined) values.metadata = patch.metadata ?? null;
|
||||
|
||||
const row = await db
|
||||
.update(environments)
|
||||
.set(values)
|
||||
.where(eq(environments.id, id))
|
||||
.returning()
|
||||
.then((rows) => rows[0] ?? null);
|
||||
return row ? toEnvironment(row) : null;
|
||||
},
|
||||
|
||||
listLeases: async (
|
||||
environmentId: string,
|
||||
filters: {
|
||||
status?: string;
|
||||
} = {},
|
||||
): Promise<EnvironmentLease[]> => {
|
||||
const conditions = [eq(environmentLeases.environmentId, environmentId)];
|
||||
if (filters.status) conditions.push(eq(environmentLeases.status, filters.status));
|
||||
const rows = await db
|
||||
.select()
|
||||
.from(environmentLeases)
|
||||
.where(and(...conditions))
|
||||
.orderBy(desc(environmentLeases.lastUsedAt), desc(environmentLeases.createdAt));
|
||||
return rows.map(toEnvironmentLease);
|
||||
},
|
||||
|
||||
acquireLease: async (input: {
|
||||
companyId: string;
|
||||
environmentId: string;
|
||||
executionWorkspaceId?: string | null;
|
||||
issueId?: string | null;
|
||||
heartbeatRunId?: string | null;
|
||||
leasePolicy?: EnvironmentLeasePolicy;
|
||||
provider?: string | null;
|
||||
providerLeaseId?: string | null;
|
||||
expiresAt?: Date | null;
|
||||
metadata?: Record<string, unknown> | null;
|
||||
}): Promise<EnvironmentLease> => {
|
||||
const now = new Date();
|
||||
const row = await db
|
||||
.insert(environmentLeases)
|
||||
.values({
|
||||
companyId: input.companyId,
|
||||
environmentId: input.environmentId,
|
||||
executionWorkspaceId: input.executionWorkspaceId ?? null,
|
||||
issueId: input.issueId ?? null,
|
||||
heartbeatRunId: input.heartbeatRunId ?? null,
|
||||
status: "active",
|
||||
leasePolicy: input.leasePolicy ?? "ephemeral",
|
||||
provider: input.provider ?? null,
|
||||
providerLeaseId: input.providerLeaseId ?? null,
|
||||
acquiredAt: now,
|
||||
lastUsedAt: now,
|
||||
expiresAt: input.expiresAt ?? null,
|
||||
releasedAt: null,
|
||||
failureReason: null,
|
||||
cleanupStatus: null,
|
||||
metadata: input.metadata ?? null,
|
||||
createdAt: now,
|
||||
updatedAt: now,
|
||||
})
|
||||
.returning()
|
||||
.then((rows) => rows[0] ?? null);
|
||||
if (!row) {
|
||||
throw new Error("Failed to acquire environment lease");
|
||||
}
|
||||
return toEnvironmentLease(row);
|
||||
},
|
||||
|
||||
releaseLease: async (
|
||||
id: string,
|
||||
status: Extract<EnvironmentLeaseStatus, "released" | "expired" | "failed"> = "released",
|
||||
options?: {
|
||||
failureReason?: string;
|
||||
cleanupStatus?: EnvironmentLeaseCleanupStatus;
|
||||
},
|
||||
) => {
|
||||
const now = new Date();
|
||||
const row = await db
|
||||
.update(environmentLeases)
|
||||
.set({
|
||||
status,
|
||||
releasedAt: now,
|
||||
lastUsedAt: now,
|
||||
updatedAt: now,
|
||||
...(options?.failureReason !== undefined ? { failureReason: options.failureReason } : {}),
|
||||
...(options?.cleanupStatus !== undefined ? { cleanupStatus: options.cleanupStatus } : {}),
|
||||
})
|
||||
.where(eq(environmentLeases.id, id))
|
||||
.returning()
|
||||
.then((rows) => rows[0] ?? null);
|
||||
return row ? toEnvironmentLease(row) : null;
|
||||
},
|
||||
|
||||
updateLeaseMetadata: async (
|
||||
id: string,
|
||||
metadata: Record<string, unknown> | null,
|
||||
): Promise<EnvironmentLease | null> => {
|
||||
const row = await db
|
||||
.update(environmentLeases)
|
||||
.set({
|
||||
metadata,
|
||||
lastUsedAt: new Date(),
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(eq(environmentLeases.id, id))
|
||||
.returning()
|
||||
.then((rows) => rows[0] ?? null);
|
||||
return row ? toEnvironmentLease(row) : null;
|
||||
},
|
||||
|
||||
releaseLeasesForRun: async (
|
||||
heartbeatRunId: string,
|
||||
status: Extract<EnvironmentLeaseStatus, "released" | "expired" | "failed"> = "released",
|
||||
): Promise<EnvironmentLease[]> => {
|
||||
const now = new Date();
|
||||
const rows = await db
|
||||
.update(environmentLeases)
|
||||
.set({
|
||||
status,
|
||||
releasedAt: now,
|
||||
lastUsedAt: now,
|
||||
updatedAt: now,
|
||||
})
|
||||
.where(
|
||||
and(
|
||||
eq(environmentLeases.heartbeatRunId, heartbeatRunId),
|
||||
eq(environmentLeases.status, "active"),
|
||||
),
|
||||
)
|
||||
.returning();
|
||||
return rows.map(toEnvironmentLease);
|
||||
},
|
||||
};
|
||||
}
|
||||
|
|
@ -9,6 +9,7 @@ import {
|
|||
AGENT_DEFAULT_MAX_CONCURRENT_RUNS,
|
||||
ISSUE_CONTINUATION_SUMMARY_DOCUMENT_KEY,
|
||||
type BillingType,
|
||||
type EnvironmentLeaseStatus,
|
||||
type ExecutionWorkspace,
|
||||
type ExecutionWorkspaceConfig,
|
||||
type RunLivenessState,
|
||||
|
|
@ -84,6 +85,7 @@ import {
|
|||
refreshIssueContinuationSummary,
|
||||
} from "./issue-continuation-summary.js";
|
||||
import { executionWorkspaceService, mergeExecutionWorkspaceConfig } from "./execution-workspaces.js";
|
||||
import { environmentService } from "./environments.js";
|
||||
import { workspaceOperationService } from "./workspace-operations.js";
|
||||
import { isProcessGroupAlive, terminateLocalService } from "./local-service-supervisor.js";
|
||||
import {
|
||||
|
|
@ -122,6 +124,10 @@ const MAX_RUN_EVENT_PAYLOAD_OBJECT_KEYS = 100;
|
|||
const MAX_RUN_EVENT_PAYLOAD_DEPTH = 6;
|
||||
const HEARTBEAT_MAX_CONCURRENT_RUNS_DEFAULT = AGENT_DEFAULT_MAX_CONCURRENT_RUNS;
|
||||
const HEARTBEAT_MAX_CONCURRENT_RUNS_MAX = 10;
|
||||
const LIVENESS_BOOKKEEPING_ACTIVITY_ACTIONS = [
|
||||
"environment.lease_acquired",
|
||||
"environment.lease_released",
|
||||
];
|
||||
const DEFERRED_WAKE_CONTEXT_KEY = "_paperclipWakeContext";
|
||||
const WAKE_COMMENT_IDS_KEY = "wakeCommentIds";
|
||||
const PAPERCLIP_WAKE_PAYLOAD_KEY = "paperclipWake";
|
||||
|
|
@ -306,6 +312,12 @@ async function resolveRunScopedMentionedSkillKeys(input: {
|
|||
.filter((skillKey): skillKey is string => Boolean(skillKey));
|
||||
}
|
||||
|
||||
function leaseReleaseStatusForRunStatus(
|
||||
status: string | null | undefined,
|
||||
): Extract<EnvironmentLeaseStatus, "released" | "expired" | "failed"> {
|
||||
return status === "failed" || status === "timed_out" ? "failed" : "released";
|
||||
}
|
||||
|
||||
export function applyPersistedExecutionWorkspaceConfig(input: {
|
||||
config: Record<string, unknown>;
|
||||
workspaceConfig: ExecutionWorkspaceConfig | null;
|
||||
|
|
@ -1832,6 +1844,7 @@ export function heartbeatService(db: Db) {
|
|||
const companySkills = companySkillService(db);
|
||||
const issuesSvc = issueService(db);
|
||||
const executionWorkspacesSvc = executionWorkspaceService(db);
|
||||
const environmentsSvc = environmentService(db);
|
||||
const workspaceOperationsSvc = workspaceOperationService(db);
|
||||
const activeRunExecutions = new Set<string>();
|
||||
const budgetHooks = {
|
||||
|
|
@ -2991,6 +3004,26 @@ export function heartbeatService(db: Db) {
|
|||
return retryRun;
|
||||
}
|
||||
|
||||
async function hasDeferredIssueCommentWake(companyId: string, issueId: string, agentId: string) {
|
||||
const deferredPayloads = await db
|
||||
.select({ payload: agentWakeupRequests.payload })
|
||||
.from(agentWakeupRequests)
|
||||
.where(
|
||||
and(
|
||||
eq(agentWakeupRequests.companyId, companyId),
|
||||
eq(agentWakeupRequests.agentId, agentId),
|
||||
eq(agentWakeupRequests.status, "deferred_issue_execution"),
|
||||
sql`${agentWakeupRequests.payload} ->> 'issueId' = ${issueId}`,
|
||||
),
|
||||
);
|
||||
|
||||
return deferredPayloads.some(({ payload }) => {
|
||||
const parsedPayload = parseObject(payload);
|
||||
const deferredContext = parseObject(parsedPayload[DEFERRED_WAKE_CONTEXT_KEY]);
|
||||
return Boolean(deriveCommentId(deferredContext, parsedPayload));
|
||||
});
|
||||
}
|
||||
|
||||
async function finalizeIssueCommentPolicy(
|
||||
run: typeof heartbeatRuns.$inferSelect,
|
||||
agent: typeof agents.$inferSelect,
|
||||
|
|
@ -3043,6 +3076,21 @@ export function heartbeatService(db: Db) {
|
|||
return { outcome: "not_applicable" as const, queuedRun: null };
|
||||
}
|
||||
|
||||
if (await hasDeferredIssueCommentWake(run.companyId, issueId, run.agentId)) {
|
||||
await patchRunIssueCommentStatus(run.id, {
|
||||
issueCommentStatus: "not_applicable",
|
||||
issueCommentSatisfiedByCommentId: null,
|
||||
issueCommentRetryQueuedAt: null,
|
||||
});
|
||||
await appendRunEvent(run, await nextRunEventSeq(run.id), {
|
||||
eventType: "lifecycle",
|
||||
stream: "system",
|
||||
level: "info",
|
||||
message: "Run ended without an issue comment; a deferred comment wake already exists for this issue",
|
||||
});
|
||||
return { outcome: "not_applicable" as const, queuedRun: null };
|
||||
}
|
||||
|
||||
const queuedRun = await enqueueMissingIssueCommentRetry(run, agent, issueId);
|
||||
if (queuedRun) {
|
||||
await appendRunEvent(run, await nextRunEventSeq(run.id), {
|
||||
|
|
@ -3698,7 +3746,13 @@ export function heartbeatService(db: Db) {
|
|||
latestAt: sql<Date | null>`max(${activityLog.createdAt})`,
|
||||
})
|
||||
.from(activityLog)
|
||||
.where(and(eq(activityLog.companyId, run.companyId), eq(activityLog.runId, run.id)));
|
||||
.where(
|
||||
and(
|
||||
eq(activityLog.companyId, run.companyId),
|
||||
eq(activityLog.runId, run.id),
|
||||
notInArray(activityLog.action, LIVENESS_BOOKKEEPING_ACTIVITY_ACTIONS),
|
||||
),
|
||||
);
|
||||
|
||||
const [eventStats] = await db
|
||||
.select({
|
||||
|
|
@ -5215,6 +5269,47 @@ export function heartbeatService(db: Db) {
|
|||
})(),
|
||||
};
|
||||
context.paperclipWorkspaces = resolvedWorkspace.workspaceHints;
|
||||
const localEnvironment = await environmentsSvc.ensureLocalEnvironment(agent.companyId);
|
||||
const environmentLease = await environmentsSvc.acquireLease({
|
||||
companyId: agent.companyId,
|
||||
environmentId: localEnvironment.id,
|
||||
executionWorkspaceId: persistedExecutionWorkspace?.id ?? null,
|
||||
issueId: issueId ?? null,
|
||||
heartbeatRunId: run.id,
|
||||
leasePolicy: "ephemeral",
|
||||
provider: "local",
|
||||
metadata: {
|
||||
driver: "local",
|
||||
executionWorkspaceMode: persistedExecutionWorkspace?.mode ?? effectiveExecutionWorkspaceMode,
|
||||
cwd: executionWorkspace.cwd,
|
||||
},
|
||||
});
|
||||
context.paperclipEnvironment = {
|
||||
id: localEnvironment.id,
|
||||
name: localEnvironment.name,
|
||||
driver: localEnvironment.driver,
|
||||
leaseId: environmentLease.id,
|
||||
};
|
||||
await logActivity(db, {
|
||||
companyId: agent.companyId,
|
||||
actorType: "agent",
|
||||
actorId: agent.id,
|
||||
agentId: agent.id,
|
||||
runId: run.id,
|
||||
action: "environment.lease_acquired",
|
||||
entityType: "environment_lease",
|
||||
entityId: environmentLease.id,
|
||||
details: {
|
||||
environmentId: localEnvironment.id,
|
||||
driver: localEnvironment.driver,
|
||||
leasePolicy: environmentLease.leasePolicy,
|
||||
provider: environmentLease.provider,
|
||||
executionWorkspaceId: environmentLease.executionWorkspaceId,
|
||||
issueId,
|
||||
},
|
||||
}).catch((err) => {
|
||||
logger.warn({ err, runId: run.id }, "failed to log environment lease acquisition");
|
||||
});
|
||||
const runtimeServiceIntents = (() => {
|
||||
const runtimeConfig = parseObject(resolvedConfig.workspaceRuntime);
|
||||
return Array.isArray(runtimeConfig.services)
|
||||
|
|
@ -5231,6 +5326,13 @@ export function heartbeatService(db: Db) {
|
|||
if (executionWorkspace.projectId && !readNonEmptyString(context.projectId)) {
|
||||
context.projectId = executionWorkspace.projectId;
|
||||
}
|
||||
await db
|
||||
.update(heartbeatRuns)
|
||||
.set({
|
||||
contextSnapshot: context,
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(eq(heartbeatRuns.id, run.id));
|
||||
const runtimeSessionFallback = taskKey || resetTaskSession ? null : runtime.sessionId;
|
||||
let previousSessionDisplayId = truncateDisplayId(
|
||||
explicitResumeSessionDisplayId ??
|
||||
|
|
@ -5823,6 +5925,34 @@ export function heartbeatService(db: Db) {
|
|||
// DB calls threw (e.g. a transient DB error in finalizeAgentStatus).
|
||||
await finalizeAgentStatus(run.agentId, "failed").catch(() => undefined);
|
||||
} finally {
|
||||
const latestRun = await getRun(run.id).catch(() => null);
|
||||
const releasedLeases = await environmentsSvc
|
||||
.releaseLeasesForRun(run.id, leaseReleaseStatusForRunStatus(latestRun?.status))
|
||||
.catch((err) => {
|
||||
logger.warn({ err, runId: run.id }, "failed to release environment leases for heartbeat run");
|
||||
return [];
|
||||
});
|
||||
for (const lease of releasedLeases) {
|
||||
await logActivity(db, {
|
||||
companyId: run.companyId,
|
||||
actorType: "agent",
|
||||
actorId: run.agentId,
|
||||
agentId: run.agentId,
|
||||
runId: run.id,
|
||||
action: "environment.lease_released",
|
||||
entityType: "environment_lease",
|
||||
entityId: lease.id,
|
||||
details: {
|
||||
environmentId: lease.environmentId,
|
||||
driver: lease.metadata?.driver ?? "local",
|
||||
leasePolicy: lease.leasePolicy,
|
||||
provider: lease.provider,
|
||||
executionWorkspaceId: lease.executionWorkspaceId,
|
||||
issueId: lease.issueId,
|
||||
status: lease.status,
|
||||
},
|
||||
}).catch(() => undefined);
|
||||
}
|
||||
await releaseRuntimeServicesForRun(run.id).catch(() => undefined);
|
||||
activeRunExecutions.delete(run.id);
|
||||
await startNextQueuedRunForAgent(run.agentId);
|
||||
|
|
|
|||
|
|
@ -41,6 +41,7 @@ export { boardAuthService } from "./board-auth.js";
|
|||
export { instanceSettingsService } from "./instance-settings.js";
|
||||
export { companyPortabilityService } from "./company-portability.js";
|
||||
export { executionWorkspaceService } from "./execution-workspaces.js";
|
||||
export { environmentService } from "./environments.js";
|
||||
export { workspaceOperationService } from "./workspace-operations.js";
|
||||
export { workProductService } from "./work-products.js";
|
||||
export { logActivity, type LogActivityInput } from "./activity-log.js";
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue