Add workspace runtime controls

Expose project and execution workspace runtime defaults, control endpoints, startup recovery, and operator UI for start/stop/restart flows.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
dotta 2026-03-28 16:46:43 -05:00
parent f1ad07616c
commit 1f1fe9c989
25 changed files with 1133 additions and 51 deletions

View file

@ -6,7 +6,7 @@ import path from "node:path";
import { setTimeout as delay } from "node:timers/promises";
import type { AdapterRuntimeServiceReport } from "@paperclipai/adapter-utils";
import type { Db } from "@paperclipai/db";
import { workspaceRuntimeServices } from "@paperclipai/db";
import { executionWorkspaces, projectWorkspaces, workspaceRuntimeServices } from "@paperclipai/db";
import { and, desc, eq, inArray } from "drizzle-orm";
import { asNumber, asString, parseObject, renderTemplate } from "../adapters/utils.js";
import { resolveHomeAwarePath } from "../home-paths.js";
@ -21,6 +21,8 @@ import {
writeLocalServiceRegistryRecord,
} from "./local-service-supervisor.js";
import type { WorkspaceOperationRecorder } from "./workspace-operations.js";
import { readExecutionWorkspaceConfig } from "./execution-workspaces.js";
import { readProjectWorkspaceRuntimeConfig } from "./project-workspace-runtime-config.js";
export interface ExecutionWorkspaceInput {
baseCwd: string;
@ -38,7 +40,7 @@ export interface ExecutionWorkspaceIssueRef {
}
export interface ExecutionWorkspaceAgentRef {
id: string;
id: string | null;
name: string;
companyId: string;
}
@ -211,7 +213,7 @@ function renderWorkspaceTemplate(template: string, input: {
title: input.issue?.title ?? "",
},
agent: {
id: input.agent.id,
id: input.agent.id ?? "",
name: input.agent.name,
},
project: {
@ -334,7 +336,7 @@ function buildWorkspaceCommandEnv(input: {
env.PAPERCLIP_WORKSPACE_CREATED = input.created ? "true" : "false";
env.PAPERCLIP_PROJECT_ID = input.base.projectId ?? "";
env.PAPERCLIP_PROJECT_WORKSPACE_ID = input.base.workspaceId ?? "";
env.PAPERCLIP_AGENT_ID = input.agent.id;
env.PAPERCLIP_AGENT_ID = input.agent.id ?? "";
env.PAPERCLIP_AGENT_NAME = input.agent.name;
env.PAPERCLIP_COMPANY_ID = input.agent.companyId;
env.PAPERCLIP_ISSUE_ID = input.issue?.id ?? "";
@ -903,7 +905,7 @@ function buildTemplateData(input: {
title: input.issue?.title ?? "",
},
agent: {
id: input.agent.id,
id: input.agent.id ?? "",
name: input.agent.name,
},
port: input.port ?? "",
@ -1091,7 +1093,7 @@ export function normalizeAdapterManagedRuntimeServices(input: {
url: report.url ?? null,
provider: "adapter_managed",
providerRef: report.providerRef ?? null,
ownerAgentId: report.ownerAgentId ?? input.agent.id,
ownerAgentId: report.ownerAgentId ?? input.agent.id ?? null,
startedByRunId: input.runId,
lastUsedAt: nowIso,
startedAt: nowIso,
@ -1203,7 +1205,7 @@ async function startLocalRuntimeService(input: {
url: adoptedRecord.url ?? url,
provider: "local_process",
providerRef: String(adoptedRecord.pid),
ownerAgentId: input.agent.id,
ownerAgentId: input.agent.id ?? null,
startedByRunId: input.runId,
lastUsedAt: new Date().toISOString(),
startedAt: adoptedRecord.startedAt,
@ -1277,7 +1279,7 @@ async function startLocalRuntimeService(input: {
url,
provider: "local_process",
providerRef: child.pid ? String(child.pid) : null,
ownerAgentId: input.agent.id,
ownerAgentId: input.agent.id ?? null,
startedByRunId: input.runId,
lastUsedAt: new Date().toISOString(),
startedAt: new Date().toISOString(),
@ -1345,7 +1347,10 @@ async function stopRuntimeService(serviceId: string) {
record.lastUsedAt = new Date().toISOString();
record.stoppedAt = new Date().toISOString();
if (record.child && record.child.pid) {
terminateChildProcess(record.child);
await terminateLocalService({
pid: record.child.pid,
processGroupId: record.processGroupId ?? record.child.pid,
});
} else if (record.providerRef) {
const pid = Number.parseInt(record.providerRef, 10);
if (Number.isInteger(pid) && pid > 0) {
@ -1409,6 +1414,13 @@ function registerRuntimeService(db: Db | undefined, record: RuntimeServiceRecord
});
}
function readRuntimeServiceEntries(config: Record<string, unknown>) {
const runtime = parseObject(config.workspaceRuntime);
return Array.isArray(runtime.services)
? runtime.services.filter((entry): entry is Record<string, unknown> => typeof entry === "object" && entry !== null)
: [];
}
export async function ensureRuntimeServicesForRun(input: {
db?: Db;
runId: string;
@ -1420,10 +1432,7 @@ export async function ensureRuntimeServicesForRun(input: {
adapterEnv: Record<string, string>;
onLog?: (stream: "stdout" | "stderr", chunk: string) => Promise<void>;
}): Promise<RuntimeServiceRef[]> {
const runtime = parseObject(input.config.workspaceRuntime);
const rawServices = Array.isArray(runtime.services)
? runtime.services.filter((entry): entry is Record<string, unknown> => typeof entry === "object" && entry !== null)
: [];
const rawServices = readRuntimeServiceEntries(input.config);
const acquiredServiceIds: string[] = [];
const refs: RuntimeServiceRef[] = [];
runtimeServiceLeasesByRun.set(input.runId, acquiredServiceIds);
@ -1493,6 +1502,79 @@ export async function ensureRuntimeServicesForRun(input: {
return refs;
}
export async function startRuntimeServicesForWorkspaceControl(input: {
db?: Db;
invocationId?: string;
actor: ExecutionWorkspaceAgentRef;
issue: ExecutionWorkspaceIssueRef | null;
workspace: RealizedExecutionWorkspace;
executionWorkspaceId?: string | null;
config: Record<string, unknown>;
adapterEnv: Record<string, string>;
onLog?: (stream: "stdout" | "stderr", chunk: string) => Promise<void>;
}): Promise<RuntimeServiceRef[]> {
const rawServices = readRuntimeServiceEntries(input.config);
const refs: RuntimeServiceRef[] = [];
const invocationId = input.invocationId ?? randomUUID();
for (const service of rawServices) {
const lifecycle = asString(service.lifecycle, "shared") === "ephemeral" ? "ephemeral" : "shared";
const { scopeType, scopeId } = resolveServiceScopeId({
service,
workspace: input.workspace,
executionWorkspaceId: input.executionWorkspaceId,
issue: input.issue,
runId: invocationId,
agent: input.actor,
});
const envConfig = parseObject(service.env);
const envFingerprint = createHash("sha256").update(stableStringify(envConfig)).digest("hex");
const serviceName = asString(service.name, "service");
const reuseKey =
lifecycle === "shared"
? [scopeType, scopeId ?? "", serviceName, envFingerprint].join(":")
: null;
if (reuseKey) {
const existingId = runtimeServicesByReuseKey.get(reuseKey);
const existing = existingId ? runtimeServicesById.get(existingId) : null;
if (existing && existing.status === "running") {
existing.lastUsedAt = new Date().toISOString();
existing.stoppedAt = null;
clearIdleTimer(existing);
void touchLocalServiceRegistryRecord(existing.serviceKey, {
runtimeServiceId: existing.id,
lastSeenAt: existing.lastUsedAt,
});
await persistRuntimeServiceRecord(input.db, existing);
refs.push(toRuntimeServiceRef(existing, { reused: true }));
continue;
}
}
const record = await startLocalRuntimeService({
db: input.db,
runId: invocationId,
agent: input.actor,
issue: input.issue,
workspace: input.workspace,
executionWorkspaceId: input.executionWorkspaceId,
adapterEnv: input.adapterEnv,
service,
onLog: input.onLog,
reuseKey,
scopeType,
scopeId,
});
record.startedByRunId = null;
registerRuntimeService(input.db, record);
await persistRuntimeServiceRecord(input.db, record);
refs.push(toRuntimeServiceRef(record));
}
return refs;
}
export async function releaseRuntimeServicesForRun(runId: string) {
const acquired = runtimeServiceLeasesByRun.get(runId) ?? [];
runtimeServiceLeasesByRun.delete(runId);
@ -1543,6 +1625,39 @@ export async function stopRuntimeServicesForExecutionWorkspace(input: {
}
}
export async function stopRuntimeServicesForProjectWorkspace(input: {
db?: Db;
projectWorkspaceId: string;
}) {
const matchingServiceIds = Array.from(runtimeServicesById.values())
.filter((record) => record.projectWorkspaceId === input.projectWorkspaceId && record.scopeType === "project_workspace")
.map((record) => record.id);
for (const serviceId of matchingServiceIds) {
await stopRuntimeService(serviceId);
}
if (input.db) {
const now = new Date();
await input.db
.update(workspaceRuntimeServices)
.set({
status: "stopped",
healthStatus: "unknown",
stoppedAt: now,
lastUsedAt: now,
updatedAt: now,
})
.where(
and(
eq(workspaceRuntimeServices.projectWorkspaceId, input.projectWorkspaceId),
eq(workspaceRuntimeServices.scopeType, "project_workspace"),
inArray(workspaceRuntimeServices.status, ["starting", "running"]),
),
);
}
}
export async function listWorkspaceRuntimeServicesForProjectWorkspaces(
db: Db,
companyId: string,
@ -1556,6 +1671,7 @@ export async function listWorkspaceRuntimeServicesForProjectWorkspaces(
and(
eq(workspaceRuntimeServices.companyId, companyId),
inArray(workspaceRuntimeServices.projectWorkspaceId, projectWorkspaceIds),
eq(workspaceRuntimeServices.scopeType, "project_workspace"),
),
)
.orderBy(desc(workspaceRuntimeServices.updatedAt), desc(workspaceRuntimeServices.createdAt));
@ -1661,6 +1777,93 @@ export async function reconcilePersistedRuntimeServicesOnStartup(db: Db) {
return { reconciled: rows.length, adopted, stopped };
}
export async function restartDesiredRuntimeServicesOnStartup(db: Db) {
let restarted = 0;
let failed = 0;
const projectWorkspaceRows = await db
.select()
.from(projectWorkspaces);
for (const row of projectWorkspaceRows) {
const runtimeConfig = readProjectWorkspaceRuntimeConfig((row.metadata as Record<string, unknown> | null) ?? null);
if (runtimeConfig?.desiredState !== "running" || !runtimeConfig.workspaceRuntime || !row.cwd) continue;
try {
const refs = await startRuntimeServicesForWorkspaceControl({
db,
actor: { id: null, name: "Paperclip", companyId: row.companyId },
issue: null,
workspace: {
baseCwd: row.cwd,
source: "project_primary",
projectId: row.projectId,
workspaceId: row.id,
repoUrl: row.repoUrl ?? null,
repoRef: row.repoRef ?? null,
strategy: "project_primary",
cwd: row.cwd,
branchName: row.defaultRef ?? row.repoRef ?? null,
worktreePath: null,
warnings: [],
created: false,
},
config: { workspaceRuntime: runtimeConfig.workspaceRuntime },
adapterEnv: {},
});
if (refs.length > 0) restarted += refs.filter((ref) => !ref.reused).length;
} catch {
failed += 1;
}
}
const executionWorkspaceRows = await db
.select()
.from(executionWorkspaces)
.where(inArray(executionWorkspaces.status, ["active", "idle", "in_review", "cleanup_failed"]));
for (const row of executionWorkspaceRows) {
const config = readExecutionWorkspaceConfig((row.metadata as Record<string, unknown> | null) ?? null);
if (config?.desiredState !== "running" || !config.workspaceRuntime || !row.cwd) continue;
try {
const refs = await startRuntimeServicesForWorkspaceControl({
db,
actor: { id: null, name: "Paperclip", companyId: row.companyId },
issue: row.sourceIssueId
? {
id: row.sourceIssueId,
identifier: null,
title: row.name,
}
: null,
workspace: {
baseCwd: row.cwd,
source: row.mode === "shared_workspace" ? "project_primary" : "task_session",
projectId: row.projectId,
workspaceId: row.projectWorkspaceId ?? null,
repoUrl: row.repoUrl ?? null,
repoRef: row.baseRef ?? null,
strategy: row.strategyType === "git_worktree" ? "git_worktree" : "project_primary",
cwd: row.cwd,
branchName: row.branchName ?? null,
worktreePath: row.strategyType === "git_worktree" ? row.cwd : null,
warnings: [],
created: false,
},
executionWorkspaceId: row.id,
config: { workspaceRuntime: config.workspaceRuntime },
adapterEnv: {},
});
if (refs.length > 0) restarted += refs.filter((ref) => !ref.reused).length;
} catch {
failed += 1;
}
}
return { restarted, failed };
}
export async function persistAdapterManagedRuntimeServices(input: {
db: Db;
adapterType: string;