diff --git a/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts b/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts index 5a951a645c..e8333bcce7 100644 --- a/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts @@ -1,6 +1,6 @@ import type { BillingCache } from "../billingCache.js"; import { startSpan } from "@internal/tracing"; -import { assertExhaustive } from "@trigger.dev/core"; +import { assertExhaustive, tryCatch } from "@trigger.dev/core"; import { DequeuedMessage, RetryOptions } from "@trigger.dev/core/v3"; import { placementTag } from "@trigger.dev/core/v3/serverOnly"; import { getMaxDuration } from "@trigger.dev/core/v3/isomorphic"; @@ -611,20 +611,24 @@ export class DequeueSystem { } ); - const run = await prisma.taskRun.findFirst({ - where: { id: runId }, - include: { - runtimeEnvironment: true, - }, - }); + // Wrap the Prisma call with tryCatch - if DB is unavailable, we still want to nack via Redis + const [findError, run] = await tryCatch( + prisma.taskRun.findFirst({ + where: { id: runId }, + include: { + runtimeEnvironment: true, + }, + }) + ); - if (!run) { - //this isn't ideal because we're not creating a snapshot… but we can't do much else + // If DB is unavailable or run not found, just nack directly via Redis + if (findError || !run) { this.$.logger.error( - "RunEngine.dequeueFromWorkerQueue(): Thrown error, then run not found. Nacking.", + "RunEngine.dequeueFromWorkerQueue(): Failed to find run, nacking directly via Redis", { runId, orgId, + findError, } ); await this.$.runQueue.nackMessage({ orgId, messageId: runId }); diff --git a/internal-packages/run-engine/src/engine/tests/dequeuing.test.ts b/internal-packages/run-engine/src/engine/tests/dequeuing.test.ts index 8ea142f630..5871bee832 100644 --- a/internal-packages/run-engine/src/engine/tests/dequeuing.test.ts +++ b/internal-packages/run-engine/src/engine/tests/dequeuing.test.ts @@ -1,9 +1,10 @@ -import { containerTest } from "@internal/testcontainers"; +import { assertNonNullable, containerTest } from "@internal/testcontainers"; import { trace } from "@internal/tracing"; import { DequeuedMessage } from "@trigger.dev/core/v3"; import { generateFriendlyId } from "@trigger.dev/core/v3/isomorphic"; import { PrismaClientOrTransaction } from "@trigger.dev/database"; import { expect } from "vitest"; +import { setTimeout } from "node:timers/promises"; import { MinimalAuthenticatedEnvironment } from "../../shared/index.js"; import { RunEngine } from "../index.js"; import { setupAuthenticatedEnvironment, setupBackgroundWorker } from "./setup.js"; @@ -79,6 +80,137 @@ describe("RunEngine dequeuing", () => { await engine.quit(); } }); + + containerTest( + "Direct nack after dequeue clears concurrency and allows recovery", + async ({ prisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + // Use a short heartbeat timeout so the stalled system recovers the run quickly + const pendingExecutingTimeout = 1000; + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + masterQueueConsumersDisabled: true, + processWorkerQueueDebounceMs: 50, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0005, + }, + heartbeatTimeoutsMs: { + PENDING_EXECUTING: pendingExecutingTimeout, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const taskIdentifier = "test-task"; + + // Setup background worker + await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier); + + // Trigger a single run + const runs = await triggerRuns({ + engine, + environment: authenticatedEnvironment, + taskIdentifier, + prisma, + count: 1, + }); + expect(runs.length).toBe(1); + const run = runs[0]; + + // Process master queue to move run to worker queue + await engine.runQueue.processMasterQueueForEnvironment(authenticatedEnvironment.id, 1); + + // Wait for processing + await setTimeout(500); + + // Dequeue from worker queue - this puts run in concurrency sets and creates PENDING_EXECUTING snapshot + const dequeued = await engine.dequeueFromWorkerQueue({ + consumerId: "test_12345", + workerQueue: "main", + }); + expect(dequeued.length).toBe(1); + assertNonNullable(dequeued[0]); + + // Verify run is in PENDING_EXECUTING state + const executionDataBefore = await engine.getRunExecutionData({ runId: run.id }); + assertNonNullable(executionDataBefore); + expect(executionDataBefore.snapshot.executionStatus).toBe("PENDING_EXECUTING"); + + // Verify run is in concurrency + const envConcurrencyBefore = await engine.runQueue.currentConcurrencyOfEnvironment( + authenticatedEnvironment + ); + expect(envConcurrencyBefore).toBe(1); + + // Simulate DB failure fallback: call nackMessage directly via Redis + // This is what happens when the catch block can't read from Postgres + await engine.runQueue.nackMessage({ + orgId: authenticatedEnvironment.organization.id, + messageId: run.id, + }); + + // Verify concurrency is cleared - this is the key fix! + // Without this fix, the run would stay in concurrency sets forever + const envConcurrencyAfter = await engine.runQueue.currentConcurrencyOfEnvironment( + authenticatedEnvironment + ); + expect(envConcurrencyAfter).toBe(0); + + // Verify the message is back in the queue + const envQueueLength = await engine.runQueue.lengthOfEnvQueue(authenticatedEnvironment); + expect(envQueueLength).toBe(1); + + // Wait for the stalled system to detect and recover the PENDING_EXECUTING run + // The stalled system will call tryNackAndRequeue which updates Postgres state to QUEUED + await setTimeout(pendingExecutingTimeout * 5); + + // Verify the stalled system recovered the run to QUEUED state + const executionDataAfterStall = await engine.getRunExecutionData({ runId: run.id }); + assertNonNullable(executionDataAfterStall); + expect(executionDataAfterStall.snapshot.executionStatus).toBe("QUEUED"); + + // Process master queue to move the run from env queue to worker queue + await engine.runQueue.processMasterQueueForEnvironment(authenticatedEnvironment.id, 1); + + // Wait for processing + await setTimeout(500); + + // Dequeue from worker queue - the run should now be available + const dequeuedAgain = await engine.dequeueFromWorkerQueue({ + consumerId: "test_12345", + workerQueue: "main", + }); + expect(dequeuedAgain.length).toBe(1); + assertNonNullable(dequeuedAgain[0]); + expect(dequeuedAgain[0].run.id).toBe(run.id); + } finally { + await engine.quit(); + } + } + ); }); async function triggerRuns({