Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions internal-packages/run-engine/src/engine/systems/dequeueSystem.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { BillingCache } from "../billingCache.js";
import { startSpan } from "@internal/tracing";
import { assertExhaustive } from "@trigger.dev/core";
import { assertExhaustive, tryCatch } from "@trigger.dev/core";
import { DequeuedMessage, RetryOptions } from "@trigger.dev/core/v3";
import { placementTag } from "@trigger.dev/core/v3/serverOnly";
import { getMaxDuration } from "@trigger.dev/core/v3/isomorphic";
Expand Down Expand Up @@ -611,20 +611,24 @@ export class DequeueSystem {
}
);

const run = await prisma.taskRun.findFirst({
where: { id: runId },
include: {
runtimeEnvironment: true,
},
});
// Wrap the Prisma call with tryCatch - if DB is unavailable, we still want to nack via Redis
const [findError, run] = await tryCatch(
prisma.taskRun.findFirst({
where: { id: runId },
include: {
runtimeEnvironment: true,
},
})
);

if (!run) {
//this isn't ideal because we're not creating a snapshot… but we can't do much else
// If DB is unavailable or run not found, just nack directly via Redis
if (findError || !run) {
this.$.logger.error(
"RunEngine.dequeueFromWorkerQueue(): Thrown error, then run not found. Nacking.",
"RunEngine.dequeueFromWorkerQueue(): Failed to find run, nacking directly via Redis",
{
runId,
orgId,
findError,
}
);
await this.$.runQueue.nackMessage({ orgId, messageId: runId });
Expand Down
134 changes: 133 additions & 1 deletion internal-packages/run-engine/src/engine/tests/dequeuing.test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { containerTest } from "@internal/testcontainers";
import { assertNonNullable, containerTest } from "@internal/testcontainers";
import { trace } from "@internal/tracing";
import { DequeuedMessage } from "@trigger.dev/core/v3";
import { generateFriendlyId } from "@trigger.dev/core/v3/isomorphic";
import { PrismaClientOrTransaction } from "@trigger.dev/database";
import { expect } from "vitest";
import { setTimeout } from "node:timers/promises";
import { MinimalAuthenticatedEnvironment } from "../../shared/index.js";
import { RunEngine } from "../index.js";
import { setupAuthenticatedEnvironment, setupBackgroundWorker } from "./setup.js";
Expand Down Expand Up @@ -79,6 +80,137 @@ describe("RunEngine dequeuing", () => {
await engine.quit();
}
});

containerTest(
"Direct nack after dequeue clears concurrency and allows recovery",
async ({ prisma, redisOptions }) => {
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");

// Use a short heartbeat timeout so the stalled system recovers the run quickly
const pendingExecutingTimeout = 1000;

const engine = new RunEngine({
prisma,
worker: {
redis: redisOptions,
workers: 1,
tasksPerWorker: 10,
pollIntervalMs: 100,
},
queue: {
redis: redisOptions,
masterQueueConsumersDisabled: true,
processWorkerQueueDebounceMs: 50,
},
runLock: {
redis: redisOptions,
},
machines: {
defaultMachine: "small-1x",
machines: {
"small-1x": {
name: "small-1x" as const,
cpu: 0.5,
memory: 0.5,
centsPerMs: 0.0001,
},
},
baseCostInCents: 0.0005,
},
heartbeatTimeoutsMs: {
PENDING_EXECUTING: pendingExecutingTimeout,
},
tracer: trace.getTracer("test", "0.0.0"),
});

try {
const taskIdentifier = "test-task";

// Setup background worker
await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier);

// Trigger a single run
const runs = await triggerRuns({
engine,
environment: authenticatedEnvironment,
taskIdentifier,
prisma,
count: 1,
});
expect(runs.length).toBe(1);
const run = runs[0];

// Process master queue to move run to worker queue
await engine.runQueue.processMasterQueueForEnvironment(authenticatedEnvironment.id, 1);

// Wait for processing
await setTimeout(500);

// Dequeue from worker queue - this puts run in concurrency sets and creates PENDING_EXECUTING snapshot
const dequeued = await engine.dequeueFromWorkerQueue({
consumerId: "test_12345",
workerQueue: "main",
});
expect(dequeued.length).toBe(1);
assertNonNullable(dequeued[0]);

// Verify run is in PENDING_EXECUTING state
const executionDataBefore = await engine.getRunExecutionData({ runId: run.id });
assertNonNullable(executionDataBefore);
expect(executionDataBefore.snapshot.executionStatus).toBe("PENDING_EXECUTING");

// Verify run is in concurrency
const envConcurrencyBefore = await engine.runQueue.currentConcurrencyOfEnvironment(
authenticatedEnvironment
);
expect(envConcurrencyBefore).toBe(1);

// Simulate DB failure fallback: call nackMessage directly via Redis
// This is what happens when the catch block can't read from Postgres
await engine.runQueue.nackMessage({
orgId: authenticatedEnvironment.organization.id,
messageId: run.id,
});

// Verify concurrency is cleared - this is the key fix!
// Without this fix, the run would stay in concurrency sets forever
const envConcurrencyAfter = await engine.runQueue.currentConcurrencyOfEnvironment(
authenticatedEnvironment
);
expect(envConcurrencyAfter).toBe(0);

// Verify the message is back in the queue
const envQueueLength = await engine.runQueue.lengthOfEnvQueue(authenticatedEnvironment);
expect(envQueueLength).toBe(1);

// Wait for the stalled system to detect and recover the PENDING_EXECUTING run
// The stalled system will call tryNackAndRequeue which updates Postgres state to QUEUED
await setTimeout(pendingExecutingTimeout * 5);

// Verify the stalled system recovered the run to QUEUED state
const executionDataAfterStall = await engine.getRunExecutionData({ runId: run.id });
assertNonNullable(executionDataAfterStall);
expect(executionDataAfterStall.snapshot.executionStatus).toBe("QUEUED");

// Process master queue to move the run from env queue to worker queue
await engine.runQueue.processMasterQueueForEnvironment(authenticatedEnvironment.id, 1);

// Wait for processing
await setTimeout(500);

// Dequeue from worker queue - the run should now be available
const dequeuedAgain = await engine.dequeueFromWorkerQueue({
consumerId: "test_12345",
workerQueue: "main",
});
expect(dequeuedAgain.length).toBe(1);
assertNonNullable(dequeuedAgain[0]);
expect(dequeuedAgain[0].run.id).toBe(run.id);
} finally {
await engine.quit();
}
}
);
});

async function triggerRuns({
Expand Down