Add idempotent local dev service management

Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
dotta 2026-03-28 15:42:44 -05:00
parent cadfcd1bc6
commit 6793dde597
8 changed files with 1448 additions and 35 deletions

View file

@ -10,6 +10,16 @@ import { workspaceRuntimeServices } from "@paperclipai/db";
import { and, desc, eq, inArray } from "drizzle-orm";
import { asNumber, asString, parseObject, renderTemplate } from "../adapters/utils.js";
import { resolveHomeAwarePath } from "../home-paths.js";
import {
createLocalServiceKey,
findLocalServiceRegistryRecordByRuntimeServiceId,
findAdoptableLocalService,
readLocalServicePortOwner,
removeLocalServiceRegistryRecord,
terminateLocalService,
touchLocalServiceRegistryRecord,
writeLocalServiceRegistryRecord,
} from "./local-service-supervisor.js";
import type { WorkspaceOperationRecorder } from "./workspace-operations.js";
export interface ExecutionWorkspaceInput {
@ -77,12 +87,24 @@ interface RuntimeServiceRecord extends RuntimeServiceRef {
leaseRunIds: Set<string>;
idleTimer: ReturnType<typeof globalThis.setTimeout> | null;
envFingerprint: string;
serviceKey: string;
profileKind: string;
processGroupId: number | null;
}
const runtimeServicesById = new Map<string, RuntimeServiceRecord>();
const runtimeServicesByReuseKey = new Map<string, string>();
const runtimeServiceLeasesByRun = new Map<string, string[]>();
export async function resetRuntimeServicesForTests() {
for (const record of runtimeServicesById.values()) {
clearIdleTimer(record);
}
runtimeServicesById.clear();
runtimeServicesByReuseKey.clear();
runtimeServiceLeasesByRun.clear();
}
function stableStringify(value: unknown): string {
if (Array.isArray(value)) {
return `[${value.map((entry) => stableStringify(entry)).join(",")}]`;
@ -1101,8 +1123,17 @@ async function startLocalRuntimeService(input: {
if (!command) throw new Error(`Runtime service "${serviceName}" is missing command`);
const serviceCwdTemplate = asString(input.service.cwd, ".");
const portConfig = parseObject(input.service.port);
const port = asString(portConfig.type, "") === "auto" ? await allocatePort() : null;
const envConfig = parseObject(input.service.env);
const envFingerprint = createHash("sha256").update(stableStringify(envConfig)).digest("hex");
const serviceIdentityFingerprint = input.reuseKey ?? envFingerprint;
const explicitPort = asNumber(portConfig.value, asNumber(input.service.port, 0));
const identityPort = explicitPort > 0 ? explicitPort : null;
const port =
asString(portConfig.type, "") === "auto"
? await allocatePort()
: explicitPort > 0
? explicitPort
: null;
const templateData = buildTemplateData({
workspace: input.workspace,
agent: input.agent,
@ -1124,6 +1155,80 @@ async function startLocalRuntimeService(input: {
const portEnvKey = asString(portConfig.envKey, "PORT");
env[portEnvKey] = String(port);
}
const expose = parseObject(input.service.expose);
const readiness = parseObject(input.service.readiness);
const urlTemplate =
asString(expose.urlTemplate, "") ||
asString(readiness.urlTemplate, "");
const url = urlTemplate ? renderTemplate(urlTemplate, templateData) : null;
const stopPolicy = parseObject(input.service.stopPolicy);
const serviceKey = createLocalServiceKey({
profileKind: "workspace-runtime",
serviceName,
cwd: serviceCwd,
command,
envFingerprint: serviceIdentityFingerprint,
port: identityPort,
scope: {
scopeType: input.scopeType,
scopeId: input.scopeId,
executionWorkspaceId: input.executionWorkspaceId ?? null,
reuseKey: input.reuseKey,
},
});
const adoptedRecord = await findAdoptableLocalService({
serviceKey,
command,
cwd: serviceCwd,
envFingerprint: serviceIdentityFingerprint,
port: identityPort,
});
if (adoptedRecord) {
return {
id: adoptedRecord.runtimeServiceId ?? randomUUID(),
companyId: input.agent.companyId,
projectId: input.workspace.projectId,
projectWorkspaceId: input.workspace.workspaceId,
executionWorkspaceId: input.executionWorkspaceId ?? null,
issueId: input.issue?.id ?? null,
serviceName,
status: "running",
lifecycle,
scopeType: input.scopeType,
scopeId: input.scopeId,
reuseKey: input.reuseKey,
command,
cwd: serviceCwd,
port: adoptedRecord.port ?? port,
url: adoptedRecord.url ?? url,
provider: "local_process",
providerRef: String(adoptedRecord.pid),
ownerAgentId: input.agent.id,
startedByRunId: input.runId,
lastUsedAt: new Date().toISOString(),
startedAt: adoptedRecord.startedAt,
stoppedAt: null,
stopPolicy,
healthStatus: "healthy",
reused: true,
db: input.db,
child: null,
leaseRunIds: new Set([input.runId]),
idleTimer: null,
envFingerprint,
serviceKey,
profileKind: "workspace-runtime",
processGroupId: adoptedRecord.processGroupId ?? null,
};
}
if (identityPort) {
const ownerPid = await readLocalServicePortOwner(identityPort);
if (ownerPid) {
throw new Error(
`Runtime service "${serviceName}" could not start because port ${identityPort} is already in use by pid ${ownerPid}`,
);
}
}
const shell = process.env.SHELL?.trim() || "/bin/sh";
const child = spawn(shell, ["-lc", command], {
cwd: serviceCwd,
@ -1144,13 +1249,6 @@ async function startLocalRuntimeService(input: {
if (input.onLog) await input.onLog("stderr", `[service:${serviceName}] ${text}`);
});
const expose = parseObject(input.service.expose);
const readiness = parseObject(input.service.readiness);
const urlTemplate =
asString(expose.urlTemplate, "") ||
asString(readiness.urlTemplate, "");
const url = urlTemplate ? renderTemplate(urlTemplate, templateData) : null;
try {
await waitForReadiness({ service: input.service, url });
} catch (err) {
@ -1160,8 +1258,7 @@ async function startLocalRuntimeService(input: {
);
}
const envFingerprint = createHash("sha256").update(stableStringify(envConfig)).digest("hex");
return {
const record: RuntimeServiceRecord = {
id: randomUUID(),
companyId: input.agent.companyId,
projectId: input.workspace.projectId,
@ -1185,7 +1282,7 @@ async function startLocalRuntimeService(input: {
lastUsedAt: new Date().toISOString(),
startedAt: new Date().toISOString(),
stoppedAt: null,
stopPolicy: parseObject(input.service.stopPolicy),
stopPolicy,
healthStatus: "healthy",
reused: false,
db: input.db,
@ -1193,7 +1290,41 @@ async function startLocalRuntimeService(input: {
leaseRunIds: new Set([input.runId]),
idleTimer: null,
envFingerprint,
serviceKey,
profileKind: "workspace-runtime",
processGroupId: child.pid ?? null,
};
if (child.pid) {
await writeLocalServiceRegistryRecord({
version: 1,
serviceKey,
profileKind: "workspace-runtime",
serviceName,
command,
cwd: serviceCwd,
envFingerprint: serviceIdentityFingerprint,
port,
url,
pid: child.pid,
processGroupId: child.pid,
provider: "local_process",
runtimeServiceId: record.id,
reuseKey: input.reuseKey,
startedAt: record.startedAt,
lastSeenAt: record.lastUsedAt,
metadata: {
projectId: record.projectId,
projectWorkspaceId: record.projectWorkspaceId,
executionWorkspaceId: record.executionWorkspaceId,
issueId: record.issueId,
scopeType: record.scopeType,
scopeId: record.scopeId,
},
});
}
return record;
}
function scheduleIdleStop(record: RuntimeServiceRecord) {
@ -1215,11 +1346,20 @@ async function stopRuntimeService(serviceId: string) {
record.stoppedAt = new Date().toISOString();
if (record.child && record.child.pid) {
terminateChildProcess(record.child);
} else if (record.providerRef) {
const pid = Number.parseInt(record.providerRef, 10);
if (Number.isInteger(pid) && pid > 0) {
await terminateLocalService({
pid,
processGroupId: record.processGroupId,
});
}
}
runtimeServicesById.delete(serviceId);
if (record.reuseKey) {
runtimeServicesByReuseKey.delete(record.reuseKey);
}
await removeLocalServiceRegistryRecord(record.serviceKey);
await persistRuntimeServiceRecord(record.db, record);
}
@ -1264,6 +1404,7 @@ function registerRuntimeService(db: Db | undefined, record: RuntimeServiceRecord
if (current.reuseKey && runtimeServicesByReuseKey.get(current.reuseKey) === current.id) {
runtimeServicesByReuseKey.delete(current.reuseKey);
}
void removeLocalServiceRegistryRecord(current.serviceKey);
void persistRuntimeServiceRecord(db, current);
});
}
@ -1314,6 +1455,10 @@ export async function ensureRuntimeServicesForRun(input: {
existing.lastUsedAt = new Date().toISOString();
existing.stoppedAt = null;
clearIdleTimer(existing);
void touchLocalServiceRegistryRecord(existing.serviceKey, {
runtimeServiceId: existing.id,
lastSeenAt: existing.lastUsedAt,
});
await persistRuntimeServiceRecord(input.db, existing);
acquiredServiceIds.push(existing.id);
refs.push(toRuntimeServiceRef(existing, { reused: true }));
@ -1426,8 +1571,8 @@ export async function listWorkspaceRuntimeServicesForProjectWorkspaces(
}
export async function reconcilePersistedRuntimeServicesOnStartup(db: Db) {
const staleRows = await db
.select({ id: workspaceRuntimeServices.id })
const rows = await db
.select()
.from(workspaceRuntimeServices)
.where(
and(
@ -1436,26 +1581,84 @@ export async function reconcilePersistedRuntimeServicesOnStartup(db: Db) {
),
);
if (staleRows.length === 0) return { reconciled: 0 };
if (rows.length === 0) return { reconciled: 0, adopted: 0, stopped: 0 };
const now = new Date();
await db
.update(workspaceRuntimeServices)
.set({
status: "stopped",
healthStatus: "unknown",
stoppedAt: now,
lastUsedAt: now,
updatedAt: now,
})
.where(
and(
eq(workspaceRuntimeServices.provider, "local_process"),
inArray(workspaceRuntimeServices.status, ["starting", "running"]),
),
);
let adopted = 0;
let stopped = 0;
for (const row of rows) {
const adoptedRecord = await findLocalServiceRegistryRecordByRuntimeServiceId({
runtimeServiceId: row.id,
profileKind: "workspace-runtime",
});
if (adoptedRecord) {
const record: RuntimeServiceRecord = {
id: row.id,
companyId: row.companyId,
projectId: row.projectId ?? null,
projectWorkspaceId: row.projectWorkspaceId ?? null,
executionWorkspaceId: row.executionWorkspaceId ?? null,
issueId: row.issueId ?? null,
serviceName: row.serviceName,
status: "running",
lifecycle: row.lifecycle as RuntimeServiceRecord["lifecycle"],
scopeType: row.scopeType as RuntimeServiceRecord["scopeType"],
scopeId: row.scopeId ?? null,
reuseKey: row.reuseKey ?? null,
command: row.command ?? null,
cwd: row.cwd ?? null,
port: adoptedRecord.port ?? row.port ?? null,
url: adoptedRecord.url ?? row.url ?? null,
provider: "local_process",
providerRef: String(adoptedRecord.pid),
ownerAgentId: row.ownerAgentId ?? null,
startedByRunId: row.startedByRunId ?? null,
lastUsedAt: new Date().toISOString(),
startedAt: row.startedAt.toISOString(),
stoppedAt: null,
stopPolicy: (row.stopPolicy as Record<string, unknown> | null) ?? null,
healthStatus: "healthy",
reused: true,
db,
child: null,
leaseRunIds: new Set(),
idleTimer: null,
envFingerprint: row.reuseKey ?? "",
serviceKey: adoptedRecord.serviceKey,
profileKind: "workspace-runtime",
processGroupId: adoptedRecord.processGroupId ?? null,
};
registerRuntimeService(db, record);
await touchLocalServiceRegistryRecord(adoptedRecord.serviceKey, {
runtimeServiceId: row.id,
lastSeenAt: record.lastUsedAt,
});
await persistRuntimeServiceRecord(db, record);
adopted += 1;
continue;
}
return { reconciled: staleRows.length };
const now = new Date();
await db
.update(workspaceRuntimeServices)
.set({
status: "stopped",
healthStatus: "unknown",
stoppedAt: now,
lastUsedAt: now,
updatedAt: now,
})
.where(eq(workspaceRuntimeServices.id, row.id));
const registryRecord = await findLocalServiceRegistryRecordByRuntimeServiceId({
runtimeServiceId: row.id,
profileKind: "workspace-runtime",
});
if (registryRecord) {
await removeLocalServiceRegistryRecord(registryRecord.serviceKey);
}
stopped += 1;
}
return { reconciled: rows.length, adopted, stopped };
}
export async function persistAdapterManagedRuntimeServices(input: {