Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/remove-worker-create-endpoint.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: breaking
---

Remove the unused worker group management API endpoints (GET and POST /api/v1/workers).
Comment thread
ericallam marked this conversation as resolved.
6 changes: 6 additions & 0 deletions .server-changes/worker-queue-length-always-reported.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: improvement
---

Optionally report worker queue length metrics continuously (enabled per-service via the RUN_ENGINE_WORKER_QUEUE_OBSERVER_ENABLED env var) so a queue's depth keeps being emitted even when nothing is dequeuing from it.
6 changes: 6 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,12 @@ const EnvironmentSchema = z
RUN_ENGINE_PROCESS_WORKER_QUEUE_DEBOUNCE_MS: z.coerce.number().int().default(200),
RUN_ENGINE_DEQUEUE_BLOCKING_TIMEOUT_SECONDS: z.coerce.number().int().default(10),
RUN_ENGINE_MASTER_QUEUE_CONSUMERS_INTERVAL_MS: z.coerce.number().int().default(1000),
// Off by default. Enable on a single service (e.g. the engine worker) so only one
// instance reports worker queue length, rather than every replica.
RUN_ENGINE_WORKER_QUEUE_OBSERVER_ENABLED: z.string().default("0"),
RUN_ENGINE_WORKER_QUEUE_OBSERVER_INTERVAL_MS: z.coerce.number().int().default(30_000),
Comment thread
ericallam marked this conversation as resolved.
// Comma-separated cloud providers to exclude from worker queue length observation.
RUN_ENGINE_WORKER_QUEUE_OBSERVER_EXCLUDED_CLOUD_PROVIDERS: z.string().default("digitalocean"),
RUN_ENGINE_MASTER_QUEUE_COOLOFF_PERIOD_MS: z.coerce.number().int().default(10_000),
RUN_ENGINE_MASTER_QUEUE_COOLOFF_COUNT_THRESHOLD: z.coerce.number().int().default(10),
RUN_ENGINE_MASTER_QUEUE_CONSUMER_DEQUEUE_COUNT: z.coerce.number().int().default(10),
Expand Down
73 changes: 0 additions & 73 deletions apps/webapp/app/routes/api.v1.workers.ts

This file was deleted.

13 changes: 13 additions & 0 deletions apps/webapp/app/v3/runEngine.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { RunEngine } from "@internal/run-engine";
import { $replica, prisma } from "~/db.server";
import { env } from "~/env.server";
import { createBatchGlobalRateLimiter } from "~/runEngine/concerns/batchGlobalRateLimiter.server";
import { SCHEDULED_WORKER_QUEUE_SUFFIX } from "~/runEngine/concerns/workerQueueSplit.server";
import { logger } from "~/services/logger.server";
import { defaultMachine, getCurrentPlan } from "~/services/platform.v3.server";
import { singleton } from "~/utils/singleton";
Expand Down Expand Up @@ -121,6 +122,18 @@ function createRunEngine() {
},
tracer,
meter,
workerQueueObserver: {
enabled: env.RUN_ENGINE_WORKER_QUEUE_OBSERVER_ENABLED === "1",
intervalMs: env.RUN_ENGINE_WORKER_QUEUE_OBSERVER_INTERVAL_MS,
// Also observe the scheduled split variant of each worker queue. The suffix
// naming convention lives in the webapp, so it is passed in here.
additionalQueueSuffixes: [SCHEDULED_WORKER_QUEUE_SUFFIX],
excludedCloudProviders: env.RUN_ENGINE_WORKER_QUEUE_OBSERVER_EXCLUDED_CLOUD_PROVIDERS.split(
","
)
.map((provider) => provider.trim())
.filter(Boolean),
},
defaultMaxTtl: env.RUN_ENGINE_DEFAULT_MAX_TTL,
heartbeatTimeoutsMs: {
PENDING_EXECUTING: env.RUN_ENGINE_TIMEOUT_PENDING_EXECUTING,
Expand Down
103 changes: 100 additions & 3 deletions internal-packages/run-engine/src/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import {
import { Worker } from "@trigger.dev/redis-worker";
import { assertNever } from "assert-never";
import { EventEmitter } from "node:events";
import { setTimeout } from "node:timers/promises";
import { setInterval, setTimeout } from "node:timers/promises";
import { BatchQueue } from "../batch-queue/index.js";
import type {
BatchItem,
Expand Down Expand Up @@ -100,6 +100,7 @@ export class RunEngine {
private heartbeatTimeouts: HeartbeatTimeouts;
private repairSnapshotTimeoutMs: number;
private batchQueue: BatchQueue;
private workerQueueObserverAbortController?: AbortController;

prisma: PrismaClient;
readOnlyPrisma: PrismaReplicaClient;
Expand Down Expand Up @@ -474,6 +475,96 @@ export class RunEngine {
machines: this.options.machines,
billingCache: this.billingCache,
});

this.#startWorkerQueueObserver();
}

/**
* Refreshes the set of worker queues observed by the `runqueue.workerQueue.length`
* gauge from the WorkerInstanceGroup records, so the gauge reports each worker queue's
* length even when nothing is dequeuing from it. Includes hidden groups; excludes
* groups whose cloud provider is configured to be excluded (groups with no cloud
* provider are always included).
*
* Only MANAGED groups are observed. UNMANAGED groups are created per project
* (masterQueue `<projectId>-<name>`), so observing them would grow the set, and the
* per-tick Redis fanout, with the number of self-hosted-worker projects rather than
* with the managed regions this gauge is meant to track.
*/
async refreshWorkerQueueObservation() {
const suffixes = this.options.workerQueueObserver?.additionalQueueSuffixes ?? [];
const excludedCloudProviders = new Set(
(this.options.workerQueueObserver?.excludedCloudProviders ?? []).map((p) => p.toLowerCase())
);

// Read from the replica: this is a periodic metrics-only read and worker groups change
// rarely, so a little replication lag is fine and keeps it off the primary.
const workerGroups = await this.readOnlyPrisma.workerInstanceGroup.findMany({
where: { type: "MANAGED" },
select: { masterQueue: true, cloudProvider: true },
});

const workerQueues: string[] = [];

for (const { masterQueue, cloudProvider } of workerGroups) {
if (cloudProvider && excludedCloudProviders.has(cloudProvider.toLowerCase())) {
continue;
}

workerQueues.push(masterQueue);

for (const suffix of suffixes) {
workerQueues.push(`${masterQueue}${suffix}`);
}
}

this.runQueue.setObservableWorkerQueues(workerQueues);
}

#startWorkerQueueObserver() {
if (!this.options.workerQueueObserver?.enabled) {
return;
}

const intervalMs = this.options.workerQueueObserver.intervalMs ?? 30_000;
this.workerQueueObserverAbortController = new AbortController();

this.#runWorkerQueueObserver(
intervalMs,
this.workerQueueObserverAbortController.signal
).catch((error) => {
this.logger.error("Worker queue observer loop crashed", {
error: error instanceof Error ? error.message : String(error),
});
});
}

async #runWorkerQueueObserver(intervalMs: number, signal: AbortSignal) {
const refresh = async () => {
try {
await this.refreshWorkerQueueObservation();
} catch (error) {
this.logger.error("Failed to refresh worker queue observation", {
error: error instanceof Error ? error.message : String(error),
});
}
};

// Refresh once immediately so a freshly started instance reports queue lengths
// without waiting for the first interval, then keep it fresh on an interval.
await refresh();

try {
for await (const _ of setInterval(intervalMs, null, { signal })) {
await refresh();
}
} catch (error) {
if (error instanceof Error && error.name !== "AbortError") {
throw error;
}

this.logger.debug("Worker queue observer stopped");
}
}

//MARK: - Run functions
Expand Down Expand Up @@ -1322,8 +1413,11 @@ export class RunEngine {
blockingPop?: boolean;
blockingPopTimeoutSeconds?: number;
}): Promise<DequeuedMessage[]> {
if (!skipObserving) {
// We only do this with "prod" worker queues because we don't want to observe dev (e.g. environment) worker queues
// We only do this with "prod" worker queues because we don't want to observe dev (e.g.
// environment) worker queues. When the worker queue observer is enabled it is the source
// of truth for the observed set (and applies the cloud-provider exclusions), so the
// per-dequeue registration is skipped.
if (!skipObserving && !this.options.workerQueueObserver?.enabled) {
this.runQueue.registerObservableWorkerQueue(workerQueue);
}

Expand Down Expand Up @@ -2061,6 +2155,9 @@ export class RunEngine {

async quit() {
try {
// stop the worker queue observer loop
this.workerQueueObserverAbortController?.abort();

//stop the run queue
await this.runQueue.quit();
await this.worker.stop();
Expand Down
Loading