mirror of
https://github.com/alkimake/paperclip.git
synced 2026-06-15 02:20:38 +09:00
Fix workspace runtime state reconciliation
Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
parent
5a9a2a9112
commit
f515f2aa12
9 changed files with 477 additions and 64 deletions
|
|
@ -14,6 +14,10 @@ import type {
|
|||
WorkspaceRuntimeService,
|
||||
} from "@paperclipai/shared";
|
||||
import { parseProjectExecutionWorkspacePolicy } from "./execution-workspace-policy.js";
|
||||
import {
|
||||
listCurrentRuntimeServicesForExecutionWorkspaces,
|
||||
listCurrentRuntimeServicesForProjectWorkspaces,
|
||||
} from "./workspace-runtime-read-model.js";
|
||||
|
||||
type ExecutionWorkspaceRow = typeof executionWorkspaces.$inferSelect;
|
||||
type WorkspaceRuntimeServiceRow = typeof workspaceRuntimeServices.$inferSelect;
|
||||
|
|
@ -317,6 +321,41 @@ function toExecutionWorkspace(
|
|||
};
|
||||
}
|
||||
|
||||
function usesInheritedProjectRuntimeServices(row: ExecutionWorkspaceRow) {
|
||||
if (row.mode !== "shared_workspace" || !row.projectWorkspaceId) return false;
|
||||
return !readExecutionWorkspaceConfig((row.metadata as Record<string, unknown> | null) ?? null)?.workspaceRuntime;
|
||||
}
|
||||
|
||||
async function loadEffectiveRuntimeServicesByExecutionWorkspace(
|
||||
db: Db,
|
||||
companyId: string,
|
||||
rows: ExecutionWorkspaceRow[],
|
||||
) {
|
||||
const executionRuntimeServices = await listCurrentRuntimeServicesForExecutionWorkspaces(
|
||||
db,
|
||||
companyId,
|
||||
rows.map((row) => row.id),
|
||||
);
|
||||
const projectWorkspaceIds = rows
|
||||
.filter((row) => usesInheritedProjectRuntimeServices(row))
|
||||
.map((row) => row.projectWorkspaceId)
|
||||
.filter((value): value is string => Boolean(value));
|
||||
const projectRuntimeServices = await listCurrentRuntimeServicesForProjectWorkspaces(
|
||||
db,
|
||||
companyId,
|
||||
[...new Set(projectWorkspaceIds)],
|
||||
);
|
||||
|
||||
return new Map(
|
||||
rows.map((row) => [
|
||||
row.id,
|
||||
usesInheritedProjectRuntimeServices(row)
|
||||
? (projectRuntimeServices.get(row.projectWorkspaceId!) ?? [])
|
||||
: (executionRuntimeServices.get(row.id) ?? []),
|
||||
]),
|
||||
);
|
||||
}
|
||||
|
||||
export function executionWorkspaceService(db: Db) {
|
||||
return {
|
||||
list: async (companyId: string, filters?: {
|
||||
|
|
@ -346,7 +385,13 @@ export function executionWorkspaceService(db: Db) {
|
|||
.from(executionWorkspaces)
|
||||
.where(and(...conditions))
|
||||
.orderBy(desc(executionWorkspaces.lastUsedAt), desc(executionWorkspaces.createdAt));
|
||||
return rows.map((row) => toExecutionWorkspace(row));
|
||||
const runtimeServicesByWorkspaceId = await loadEffectiveRuntimeServicesByExecutionWorkspace(db, companyId, rows);
|
||||
return rows.map((row) =>
|
||||
toExecutionWorkspace(
|
||||
row,
|
||||
(runtimeServicesByWorkspaceId.get(row.id) ?? []).map(toRuntimeService),
|
||||
),
|
||||
);
|
||||
},
|
||||
|
||||
getById: async (id: string) => {
|
||||
|
|
@ -356,12 +401,11 @@ export function executionWorkspaceService(db: Db) {
|
|||
.where(eq(executionWorkspaces.id, id))
|
||||
.then((rows) => rows[0] ?? null);
|
||||
if (!row) return null;
|
||||
const runtimeServiceRows = await db
|
||||
.select()
|
||||
.from(workspaceRuntimeServices)
|
||||
.where(eq(workspaceRuntimeServices.executionWorkspaceId, row.id))
|
||||
.orderBy(desc(workspaceRuntimeServices.updatedAt), desc(workspaceRuntimeServices.createdAt));
|
||||
return toExecutionWorkspace(row, runtimeServiceRows.map(toRuntimeService));
|
||||
const runtimeServicesByWorkspaceId = await loadEffectiveRuntimeServicesByExecutionWorkspace(db, row.companyId, [row]);
|
||||
return toExecutionWorkspace(
|
||||
row,
|
||||
(runtimeServicesByWorkspaceId.get(row.id) ?? []).map(toRuntimeService),
|
||||
);
|
||||
},
|
||||
|
||||
getCloseReadiness: async (id: string): Promise<ExecutionWorkspaceCloseReadiness | null> => {
|
||||
|
|
@ -372,12 +416,8 @@ export function executionWorkspaceService(db: Db) {
|
|||
.then((rows) => rows[0] ?? null);
|
||||
if (!workspace) return null;
|
||||
|
||||
const runtimeServiceRows = await db
|
||||
.select()
|
||||
.from(workspaceRuntimeServices)
|
||||
.where(eq(workspaceRuntimeServices.executionWorkspaceId, workspace.id))
|
||||
.orderBy(desc(workspaceRuntimeServices.updatedAt), desc(workspaceRuntimeServices.createdAt));
|
||||
const runtimeServices = runtimeServiceRows.map(toRuntimeService);
|
||||
const runtimeServicesByWorkspaceId = await loadEffectiveRuntimeServicesByExecutionWorkspace(db, workspace.companyId, [workspace]);
|
||||
const runtimeServices = (runtimeServicesByWorkspaceId.get(workspace.id) ?? []).map(toRuntimeService);
|
||||
|
||||
const linkedIssues = await db
|
||||
.select({
|
||||
|
|
|
|||
|
|
@ -184,7 +184,31 @@ export async function findLocalServiceRegistryRecordByRuntimeServiceId(input: {
|
|||
const records = await listLocalServiceRegistryRecords(
|
||||
input.profileKind ? { profileKind: input.profileKind } : undefined,
|
||||
);
|
||||
return records.find((record) => record.runtimeServiceId === input.runtimeServiceId) ?? null;
|
||||
const record = records.find((entry) => entry.runtimeServiceId === input.runtimeServiceId) ?? null;
|
||||
if (!record) return null;
|
||||
|
||||
let candidate = record;
|
||||
if (!isPidAlive(candidate.pid)) {
|
||||
const ownerPid = candidate.port ? await readLocalServicePortOwner(candidate.port) : null;
|
||||
if (!ownerPid) {
|
||||
await removeLocalServiceRegistryRecord(candidate.serviceKey);
|
||||
return null;
|
||||
}
|
||||
candidate = {
|
||||
...candidate,
|
||||
pid: ownerPid,
|
||||
processGroupId: candidate.processGroupId && isPidAlive(candidate.processGroupId) ? candidate.processGroupId : ownerPid,
|
||||
lastSeenAt: new Date().toISOString(),
|
||||
};
|
||||
await writeLocalServiceRegistryRecord(candidate);
|
||||
}
|
||||
|
||||
if (!(await isLikelyMatchingCommand(candidate))) {
|
||||
await removeLocalServiceRegistryRecord(record.serviceKey);
|
||||
return null;
|
||||
}
|
||||
|
||||
return candidate;
|
||||
}
|
||||
|
||||
export function isPidAlive(pid: number) {
|
||||
|
|
@ -203,7 +227,10 @@ async function isLikelyMatchingCommand(record: LocalServiceRegistryRecord) {
|
|||
const { stdout } = await execFileAsync("ps", ["-o", "command=", "-p", String(record.pid)]);
|
||||
const commandLine = stdout.trim();
|
||||
if (!commandLine) return false;
|
||||
return commandLine.includes(record.command) || commandLine.includes(record.serviceName);
|
||||
const normalize = (value: string) => value.replace(/["']/g, "").replace(/\s+/g, " ").trim();
|
||||
const normalizedCommandLine = normalize(commandLine);
|
||||
const normalizedRecordedCommand = normalize(record.command);
|
||||
return normalizedCommandLine.includes(normalizedRecordedCommand) || normalizedCommandLine.includes(record.serviceName);
|
||||
} catch {
|
||||
return true;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@ import {
|
|||
type ProjectWorkspace,
|
||||
type WorkspaceRuntimeService,
|
||||
} from "@paperclipai/shared";
|
||||
import { listWorkspaceRuntimeServicesForProjectWorkspaces } from "./workspace-runtime.js";
|
||||
import { listCurrentRuntimeServicesForProjectWorkspaces } from "./workspace-runtime-read-model.js";
|
||||
import { parseProjectExecutionWorkspacePolicy } from "./execution-workspace-policy.js";
|
||||
import { mergeProjectWorkspaceRuntimeConfig, readProjectWorkspaceRuntimeConfig } from "./project-workspace-runtime-config.js";
|
||||
import { resolveManagedProjectWorkspaceDir } from "../home-paths.js";
|
||||
|
|
@ -223,7 +223,7 @@ async function attachWorkspaces(db: Db, rows: ProjectWithGoals[]): Promise<Proje
|
|||
.from(projectWorkspaces)
|
||||
.where(inArray(projectWorkspaces.projectId, projectIds))
|
||||
.orderBy(desc(projectWorkspaces.isPrimary), asc(projectWorkspaces.createdAt), asc(projectWorkspaces.id));
|
||||
const runtimeServicesByWorkspaceId = await listWorkspaceRuntimeServicesForProjectWorkspaces(
|
||||
const runtimeServicesByWorkspaceId = await listCurrentRuntimeServicesForProjectWorkspaces(
|
||||
db,
|
||||
rows[0]!.companyId,
|
||||
workspaceRows.map((workspace) => workspace.id),
|
||||
|
|
@ -541,7 +541,7 @@ export function projectService(db: Db) {
|
|||
.where(eq(projectWorkspaces.projectId, projectId))
|
||||
.orderBy(desc(projectWorkspaces.isPrimary), asc(projectWorkspaces.createdAt), asc(projectWorkspaces.id));
|
||||
if (rows.length === 0) return [];
|
||||
const runtimeServicesByWorkspaceId = await listWorkspaceRuntimeServicesForProjectWorkspaces(
|
||||
const runtimeServicesByWorkspaceId = await listCurrentRuntimeServicesForProjectWorkspaces(
|
||||
db,
|
||||
rows[0]!.companyId,
|
||||
rows.map((workspace) => workspace.id),
|
||||
|
|
|
|||
96
server/src/services/workspace-runtime-read-model.ts
Normal file
96
server/src/services/workspace-runtime-read-model.ts
Normal file
|
|
@ -0,0 +1,96 @@
|
|||
import type { Db } from "@paperclipai/db";
|
||||
import { workspaceRuntimeServices } from "@paperclipai/db";
|
||||
import { and, desc, eq, inArray } from "drizzle-orm";
|
||||
|
||||
type WorkspaceRuntimeServiceRow = typeof workspaceRuntimeServices.$inferSelect;
|
||||
|
||||
function runtimeServiceIdentityKey(row: WorkspaceRuntimeServiceRow) {
|
||||
if (row.reuseKey) return row.reuseKey;
|
||||
return [
|
||||
row.scopeType,
|
||||
row.scopeId ?? "",
|
||||
row.projectWorkspaceId ?? "",
|
||||
row.executionWorkspaceId ?? "",
|
||||
row.serviceName,
|
||||
row.command ?? "",
|
||||
row.cwd ?? "",
|
||||
].join(":");
|
||||
}
|
||||
|
||||
export function selectCurrentRuntimeServiceRows(rows: WorkspaceRuntimeServiceRow[]) {
|
||||
const current = new Map<string, WorkspaceRuntimeServiceRow>();
|
||||
for (const row of rows) {
|
||||
const identity = runtimeServiceIdentityKey(row);
|
||||
if (!current.has(identity)) current.set(identity, row);
|
||||
}
|
||||
return [...current.values()];
|
||||
}
|
||||
|
||||
export async function listCurrentRuntimeServicesForProjectWorkspaces(
|
||||
db: Db,
|
||||
companyId: string,
|
||||
projectWorkspaceIds: string[],
|
||||
) {
|
||||
if (projectWorkspaceIds.length === 0) return new Map<string, WorkspaceRuntimeServiceRow[]>();
|
||||
|
||||
const rows = await db
|
||||
.select()
|
||||
.from(workspaceRuntimeServices)
|
||||
.where(
|
||||
and(
|
||||
eq(workspaceRuntimeServices.companyId, companyId),
|
||||
inArray(workspaceRuntimeServices.projectWorkspaceId, projectWorkspaceIds),
|
||||
eq(workspaceRuntimeServices.scopeType, "project_workspace"),
|
||||
),
|
||||
)
|
||||
.orderBy(desc(workspaceRuntimeServices.updatedAt), desc(workspaceRuntimeServices.createdAt));
|
||||
|
||||
const grouped = new Map<string, WorkspaceRuntimeServiceRow[]>();
|
||||
for (const row of rows) {
|
||||
if (!row.projectWorkspaceId) continue;
|
||||
const existing = grouped.get(row.projectWorkspaceId) ?? [];
|
||||
existing.push(row);
|
||||
grouped.set(row.projectWorkspaceId, existing);
|
||||
}
|
||||
|
||||
return new Map(
|
||||
Array.from(grouped.entries()).map(([workspaceId, workspaceRows]) => [
|
||||
workspaceId,
|
||||
selectCurrentRuntimeServiceRows(workspaceRows),
|
||||
]),
|
||||
);
|
||||
}
|
||||
|
||||
export async function listCurrentRuntimeServicesForExecutionWorkspaces(
|
||||
db: Db,
|
||||
companyId: string,
|
||||
executionWorkspaceIds: string[],
|
||||
) {
|
||||
if (executionWorkspaceIds.length === 0) return new Map<string, WorkspaceRuntimeServiceRow[]>();
|
||||
|
||||
const rows = await db
|
||||
.select()
|
||||
.from(workspaceRuntimeServices)
|
||||
.where(
|
||||
and(
|
||||
eq(workspaceRuntimeServices.companyId, companyId),
|
||||
inArray(workspaceRuntimeServices.executionWorkspaceId, executionWorkspaceIds),
|
||||
),
|
||||
)
|
||||
.orderBy(desc(workspaceRuntimeServices.updatedAt), desc(workspaceRuntimeServices.createdAt));
|
||||
|
||||
const grouped = new Map<string, WorkspaceRuntimeServiceRow[]>();
|
||||
for (const row of rows) {
|
||||
if (!row.executionWorkspaceId) continue;
|
||||
const existing = grouped.get(row.executionWorkspaceId) ?? [];
|
||||
existing.push(row);
|
||||
grouped.set(row.executionWorkspaceId, existing);
|
||||
}
|
||||
|
||||
return new Map(
|
||||
Array.from(grouped.entries()).map(([workspaceId, workspaceRows]) => [
|
||||
workspaceId,
|
||||
selectCurrentRuntimeServiceRows(workspaceRows),
|
||||
]),
|
||||
);
|
||||
}
|
||||
|
|
@ -1081,6 +1081,16 @@ async function waitForReadiness(input: {
|
|||
throw new Error(`Readiness check failed for ${input.url}: ${lastError}`);
|
||||
}
|
||||
|
||||
async function isRuntimeServiceUrlHealthy(url: string | null) {
|
||||
if (!url) return true;
|
||||
try {
|
||||
const response = await fetch(url, { signal: AbortSignal.timeout(2_000) });
|
||||
return response.ok;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
function toPersistedWorkspaceRuntimeService(record: RuntimeServiceRecord): typeof workspaceRuntimeServices.$inferInsert {
|
||||
return {
|
||||
id: record.id,
|
||||
|
|
@ -1847,50 +1857,55 @@ export async function reconcilePersistedRuntimeServicesOnStartup(db: Db) {
|
|||
profileKind: "workspace-runtime",
|
||||
});
|
||||
if (adoptedRecord) {
|
||||
const record: RuntimeServiceRecord = {
|
||||
id: row.id,
|
||||
companyId: row.companyId,
|
||||
projectId: row.projectId ?? null,
|
||||
projectWorkspaceId: row.projectWorkspaceId ?? null,
|
||||
executionWorkspaceId: row.executionWorkspaceId ?? null,
|
||||
issueId: row.issueId ?? null,
|
||||
serviceName: row.serviceName,
|
||||
status: "running",
|
||||
lifecycle: row.lifecycle as RuntimeServiceRecord["lifecycle"],
|
||||
scopeType: row.scopeType as RuntimeServiceRecord["scopeType"],
|
||||
scopeId: row.scopeId ?? null,
|
||||
reuseKey: row.reuseKey ?? null,
|
||||
command: row.command ?? null,
|
||||
cwd: row.cwd ?? null,
|
||||
port: adoptedRecord.port ?? row.port ?? null,
|
||||
url: adoptedRecord.url ?? row.url ?? null,
|
||||
provider: "local_process",
|
||||
providerRef: String(adoptedRecord.pid),
|
||||
ownerAgentId: row.ownerAgentId ?? null,
|
||||
startedByRunId: row.startedByRunId ?? null,
|
||||
lastUsedAt: new Date().toISOString(),
|
||||
startedAt: row.startedAt.toISOString(),
|
||||
stoppedAt: null,
|
||||
stopPolicy: (row.stopPolicy as Record<string, unknown> | null) ?? null,
|
||||
healthStatus: "healthy",
|
||||
reused: true,
|
||||
db,
|
||||
child: null,
|
||||
leaseRunIds: new Set(),
|
||||
idleTimer: null,
|
||||
envFingerprint: row.reuseKey ?? "",
|
||||
serviceKey: adoptedRecord.serviceKey,
|
||||
profileKind: "workspace-runtime",
|
||||
processGroupId: adoptedRecord.processGroupId ?? null,
|
||||
};
|
||||
registerRuntimeService(db, record);
|
||||
await touchLocalServiceRegistryRecord(adoptedRecord.serviceKey, {
|
||||
runtimeServiceId: row.id,
|
||||
lastSeenAt: record.lastUsedAt,
|
||||
});
|
||||
await persistRuntimeServiceRecord(db, record);
|
||||
adopted += 1;
|
||||
continue;
|
||||
const adoptedUrl = adoptedRecord.url ?? row.url ?? null;
|
||||
if (!(await isRuntimeServiceUrlHealthy(adoptedUrl))) {
|
||||
await removeLocalServiceRegistryRecord(adoptedRecord.serviceKey);
|
||||
} else {
|
||||
const record: RuntimeServiceRecord = {
|
||||
id: row.id,
|
||||
companyId: row.companyId,
|
||||
projectId: row.projectId ?? null,
|
||||
projectWorkspaceId: row.projectWorkspaceId ?? null,
|
||||
executionWorkspaceId: row.executionWorkspaceId ?? null,
|
||||
issueId: row.issueId ?? null,
|
||||
serviceName: row.serviceName,
|
||||
status: "running",
|
||||
lifecycle: row.lifecycle as RuntimeServiceRecord["lifecycle"],
|
||||
scopeType: row.scopeType as RuntimeServiceRecord["scopeType"],
|
||||
scopeId: row.scopeId ?? null,
|
||||
reuseKey: row.reuseKey ?? null,
|
||||
command: row.command ?? null,
|
||||
cwd: row.cwd ?? null,
|
||||
port: adoptedRecord.port ?? row.port ?? null,
|
||||
url: adoptedRecord.url ?? row.url ?? null,
|
||||
provider: "local_process",
|
||||
providerRef: String(adoptedRecord.pid),
|
||||
ownerAgentId: row.ownerAgentId ?? null,
|
||||
startedByRunId: row.startedByRunId ?? null,
|
||||
lastUsedAt: new Date().toISOString(),
|
||||
startedAt: row.startedAt.toISOString(),
|
||||
stoppedAt: null,
|
||||
stopPolicy: (row.stopPolicy as Record<string, unknown> | null) ?? null,
|
||||
healthStatus: "healthy",
|
||||
reused: true,
|
||||
db,
|
||||
child: null,
|
||||
leaseRunIds: new Set(),
|
||||
idleTimer: null,
|
||||
envFingerprint: row.reuseKey ?? "",
|
||||
serviceKey: adoptedRecord.serviceKey,
|
||||
profileKind: "workspace-runtime",
|
||||
processGroupId: adoptedRecord.processGroupId ?? null,
|
||||
};
|
||||
registerRuntimeService(db, record);
|
||||
await touchLocalServiceRegistryRecord(adoptedRecord.serviceKey, {
|
||||
runtimeServiceId: row.id,
|
||||
lastSeenAt: record.lastUsedAt,
|
||||
});
|
||||
await persistRuntimeServiceRecord(db, record);
|
||||
adopted += 1;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
const now = new Date();
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue