Skip to content

Commit 3ee9180

Browse files
committed
improve efficiency of expiring runs in batch, and make sure runs are properly cleaned up from queues and queues rebalanced after getting expired by ttl system
1 parent 998d93c commit 3ee9180

File tree

7 files changed

+626
-207
lines changed

7 files changed

+626
-207
lines changed

apps/webapp/test/engine/triggerTask.test.ts

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ import { RunEngineTriggerTaskService } from "../../app/runEngine/services/trigge
4040
import { promiseWithResolvers } from "@trigger.dev/core";
4141
import { setTimeout } from "node:timers/promises";
4242

43-
vi.setConfig({ testTimeout: 30_000 }); // 30 seconds timeout
43+
vi.setConfig({ testTimeout: 60_000 }); // 60 seconds timeout
4444

4545
class MockPayloadProcessor implements PayloadProcessor {
4646
async process(request: TriggerTaskRequest): Promise<IOPacket> {
@@ -78,9 +78,9 @@ class MockTraceEventConcern implements TraceEventConcern {
7878
spanId: "test",
7979
traceContext: {},
8080
traceparent: undefined,
81-
setAttribute: () => {},
82-
failWithError: () => {},
83-
stop: () => {},
81+
setAttribute: () => { },
82+
failWithError: () => { },
83+
stop: () => { },
8484
},
8585
"test"
8686
);
@@ -103,9 +103,9 @@ class MockTraceEventConcern implements TraceEventConcern {
103103
spanId: "test",
104104
traceContext: {},
105105
traceparent: undefined,
106-
setAttribute: () => {},
107-
failWithError: () => {},
108-
stop: () => {},
106+
setAttribute: () => { },
107+
failWithError: () => { },
108+
stop: () => { },
109109
},
110110
"test"
111111
);
@@ -128,9 +128,9 @@ class MockTraceEventConcern implements TraceEventConcern {
128128
spanId: "test",
129129
traceContext: {},
130130
traceparent: undefined,
131-
setAttribute: () => {},
132-
failWithError: () => {},
133-
stop: () => {},
131+
setAttribute: () => { },
132+
failWithError: () => { },
133+
stop: () => { },
134134
},
135135
"test"
136136
);

internal-packages/run-engine/src/batch-queue/completionTracker.ts

Lines changed: 3 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,9 @@ export class BatchCompletionTracker {
4545
}) {
4646
this.redis = createRedisClient(options.redis);
4747
this.logger = options.logger ?? {
48-
debug: () => {},
49-
info: () => {},
50-
error: () => {},
48+
debug: () => { },
49+
info: () => { },
50+
error: () => { },
5151
};
5252

5353
this.#registerCommands();
@@ -109,26 +109,6 @@ export class BatchCompletionTracker {
109109
return JSON.parse(metaJson) as BatchMeta;
110110
}
111111

112-
/**
113-
* Update the runCount in batch metadata.
114-
* Used when items are skipped due to queue limits.
115-
*/
116-
async updateRunCount(batchId: string, newRunCount: number): Promise<void> {
117-
const meta = await this.getMeta(batchId);
118-
if (!meta) {
119-
this.logger.error("Cannot update runCount: batch metadata not found", { batchId });
120-
return;
121-
}
122-
123-
const updatedMeta: BatchMeta = {
124-
...meta,
125-
runCount: newRunCount,
126-
};
127-
128-
await this.storeMeta(batchId, updatedMeta);
129-
this.logger.debug("Updated batch runCount", { batchId, oldRunCount: meta.runCount, newRunCount });
130-
}
131-
132112
// ============================================================================
133113
// Success/Failure Recording (Idempotent)
134114
// ============================================================================

internal-packages/run-engine/src/batch-queue/index.ts

Lines changed: 11 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -183,17 +183,17 @@ export class BatchQueue {
183183
// so we don't need the DLQ - we just need the retry scheduling.
184184
...(options.retry
185185
? {
186-
retry: {
187-
strategy: new ExponentialBackoffRetry({
188-
maxAttempts: options.retry.maxAttempts,
189-
minTimeoutInMs: options.retry.minTimeoutInMs ?? 1_000,
190-
maxTimeoutInMs: options.retry.maxTimeoutInMs ?? 30_000,
191-
factor: options.retry.factor ?? 2,
192-
randomize: options.retry.randomize ?? true,
193-
}),
194-
deadLetterQueue: false,
195-
},
196-
}
186+
retry: {
187+
strategy: new ExponentialBackoffRetry({
188+
maxAttempts: options.retry.maxAttempts,
189+
minTimeoutInMs: options.retry.minTimeoutInMs ?? 1_000,
190+
maxTimeoutInMs: options.retry.maxTimeoutInMs ?? 30_000,
191+
factor: options.retry.factor ?? 2,
192+
randomize: options.retry.randomize ?? true,
193+
}),
194+
deadLetterQueue: false,
195+
},
196+
}
197197
: {}),
198198
logger: this.logger,
199199
tracer: options.tracer,
@@ -395,14 +395,6 @@ export class BatchQueue {
395395
return this.completionTracker.getEnqueuedCount(batchId);
396396
}
397397

398-
/**
399-
* Update the runCount for a batch.
400-
* Used when items are skipped due to queue limits.
401-
*/
402-
async updateRunCount(batchId: string, newRunCount: number): Promise<void> {
403-
return this.completionTracker.updateRunCount(batchId, newRunCount);
404-
}
405-
406398
// ============================================================================
407399
// Public API - Query
408400
// ============================================================================

internal-packages/run-engine/src/engine/index.ts

Lines changed: 13 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -190,11 +190,11 @@ export class RunEngine {
190190
ttlSystem: options.queue?.ttlSystem?.disabled
191191
? undefined
192192
: {
193-
shardCount: options.queue?.ttlSystem?.shardCount,
194-
pollIntervalMs: options.queue?.ttlSystem?.pollIntervalMs,
195-
batchSize: options.queue?.ttlSystem?.batchSize,
196-
callback: this.#ttlExpiredCallback.bind(this),
197-
},
193+
shardCount: options.queue?.ttlSystem?.shardCount,
194+
pollIntervalMs: options.queue?.ttlSystem?.pollIntervalMs,
195+
batchSize: options.queue?.ttlSystem?.batchSize,
196+
callback: this.#ttlExpiredCallback.bind(this),
197+
},
198198
});
199199

200200
this.worker = new Worker({
@@ -655,11 +655,11 @@ export class RunEngine {
655655
associatedWaitpoint:
656656
resumeParentOnCompletion && parentTaskRunId
657657
? {
658-
create: this.waitpointSystem.buildRunAssociatedWaitpoint({
659-
projectId: environment.project.id,
660-
environmentId: environment.id,
661-
}),
662-
}
658+
create: this.waitpointSystem.buildRunAssociatedWaitpoint({
659+
projectId: environment.project.id,
660+
environmentId: environment.id,
661+
}),
662+
}
663663
: undefined,
664664
},
665665
});
@@ -832,9 +832,9 @@ export class RunEngine {
832832
const waitpointData =
833833
resumeParentOnCompletion && parentTaskRunId
834834
? this.waitpointSystem.buildRunAssociatedWaitpoint({
835-
projectId: environment.project.id,
836-
environmentId: environment.id,
837-
})
835+
projectId: environment.project.id,
836+
environmentId: environment.id,
837+
})
838838
: undefined;
839839

840840
// Create the run in terminal SYSTEM_FAILURE status.
@@ -1340,14 +1340,6 @@ export class RunEngine {
13401340
return this.batchQueue.getEnqueuedCount(batchId);
13411341
}
13421342

1343-
/**
1344-
* Update the runCount for a batch.
1345-
* Used when items are skipped due to queue limits.
1346-
*/
1347-
async updateBatchRunCount(batchId: string, newRunCount: number): Promise<void> {
1348-
return this.batchQueue.updateRunCount(batchId, newRunCount);
1349-
}
1350-
13511343
async getWaitpoint({
13521344
waitpointId,
13531345
environmentId,

internal-packages/run-engine/src/engine/systems/ttlSystem.ts

Lines changed: 60 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
import { parseNaturalLanguageDuration } from "@trigger.dev/core/v3/isomorphic";
22
import { TaskRunError } from "@trigger.dev/core/v3/schemas";
3-
import { PrismaClientOrTransaction, TaskRunStatus } from "@trigger.dev/database";
3+
import { Prisma, PrismaClientOrTransaction, TaskRunStatus } from "@trigger.dev/database";
44
import { isExecuting } from "../statuses.js";
55
import { getLatestExecutionSnapshot } from "./executionSnapshotSystem.js";
66
import { SystemResources } from "./systems.js";
77
import { WaitpointSystem } from "./waitpointSystem.js";
88
import { startSpan } from "@internal/tracing";
9+
import pMap from "p-map";
910

1011
export type TtlSystemOptions = {
1112
resources: SystemResources;
@@ -169,7 +170,7 @@ export class TtlSystem {
169170
const expired: string[] = [];
170171
const skipped: { runId: string; reason: string }[] = [];
171172

172-
// Fetch all runs with their snapshots in a single query
173+
// Fetch all runs in a single query (no snapshot data needed)
173174
const runs = await this.$.prisma.taskRun.findMany({
174175
where: { id: { in: runIds } },
175176
select: {
@@ -188,36 +189,13 @@ export class TtlSystem {
188189
projectId: true,
189190
},
190191
},
191-
executionSnapshots: {
192-
orderBy: { createdAt: "desc" },
193-
take: 1,
194-
select: {
195-
executionStatus: true,
196-
environmentId: true,
197-
environmentType: true,
198-
projectId: true,
199-
organizationId: true,
200-
},
201-
},
202192
},
203193
});
204194

205195
// Filter runs that can be expired
206196
const runsToExpire: typeof runs = [];
207197

208198
for (const run of runs) {
209-
const latestSnapshot = run.executionSnapshots[0];
210-
211-
if (!latestSnapshot) {
212-
skipped.push({ runId: run.id, reason: "no_snapshot" });
213-
continue;
214-
}
215-
216-
if (isExecuting(latestSnapshot.executionStatus)) {
217-
skipped.push({ runId: run.id, reason: "executing" });
218-
continue;
219-
}
220-
221199
if (run.status !== "PENDING") {
222200
skipped.push({ runId: run.id, reason: `status_${run.status}` });
223201
continue;
@@ -245,79 +223,70 @@ export class TtlSystem {
245223
return { expired, skipped };
246224
}
247225

248-
// Update all runs in a single batch
226+
// Update all runs in a single SQL call (status, dates, and error JSON)
249227
const now = new Date();
250228
const runIdsToExpire = runsToExpire.map((r) => r.id);
251229

252-
await this.$.prisma.taskRun.updateMany({
253-
where: { id: { in: runIdsToExpire } },
254-
data: {
255-
status: "EXPIRED" as TaskRunStatus,
256-
completedAt: now,
257-
expiredAt: now,
258-
// Note: updateMany doesn't support nested writes, so we handle error and snapshots separately
259-
},
260-
});
230+
const error: TaskRunError = {
231+
type: "STRING_ERROR",
232+
raw: "Run expired because the TTL was reached",
233+
};
234+
235+
await this.$.prisma.$executeRaw`
236+
UPDATE "TaskRun"
237+
SET "status" = 'EXPIRED'::"TaskRunStatus",
238+
"completedAt" = ${now},
239+
"expiredAt" = ${now},
240+
"updatedAt" = ${now},
241+
"error" = ${JSON.stringify(error)}::jsonb
242+
WHERE "id" IN (${Prisma.join(runIdsToExpire)})
243+
`;
244+
245+
// Process each run: enqueue waitpoint completion jobs and emit events
246+
await pMap(
247+
runsToExpire,
248+
async (run) => {
249+
try {
250+
// Enqueue a finishWaitpoint worker job for resilient waitpoint completion
251+
if (run.associatedWaitpoint) {
252+
await this.$.worker.enqueue({
253+
id: `finishWaitpoint.ttl.${run.associatedWaitpoint.id}`,
254+
job: "finishWaitpoint",
255+
payload: {
256+
waitpointId: run.associatedWaitpoint.id,
257+
error: JSON.stringify(error),
258+
},
259+
});
260+
}
261+
262+
// Emit event
263+
this.$.eventBus.emit("runExpired", {
264+
run: {
265+
id: run.id,
266+
spanId: run.spanId,
267+
ttl: run.ttl,
268+
taskEventStore: run.taskEventStore,
269+
createdAt: run.createdAt,
270+
updatedAt: now,
271+
completedAt: now,
272+
expiredAt: now,
273+
status: "EXPIRED" as TaskRunStatus,
274+
},
275+
time: now,
276+
organization: { id: run.runtimeEnvironment.organizationId },
277+
project: { id: run.runtimeEnvironment.projectId },
278+
environment: { id: run.runtimeEnvironment.id },
279+
});
261280

262-
// Create snapshots and set errors for each run (these require individual updates)
263-
await Promise.all(
264-
runsToExpire.map(async (run) => {
265-
const latestSnapshot = run.executionSnapshots[0]!;
266-
const error: TaskRunError = {
267-
type: "STRING_ERROR",
268-
raw: `Run expired because the TTL (${run.ttl}) was reached`,
269-
};
270-
271-
// Update the error field (updateMany can't do JSON fields properly)
272-
await this.$.prisma.taskRun.update({
273-
where: { id: run.id },
274-
data: { error },
275-
});
276-
277-
// Create the snapshot
278-
await this.$.prisma.taskRunExecutionSnapshot.create({
279-
data: {
281+
expired.push(run.id);
282+
} catch (e) {
283+
this.$.logger.error("Failed to process expired run", {
280284
runId: run.id,
281-
engine: "V2",
282-
executionStatus: "FINISHED",
283-
description: "Run was expired because the TTL was reached",
284-
runStatus: "EXPIRED",
285-
environmentId: latestSnapshot.environmentId,
286-
environmentType: latestSnapshot.environmentType,
287-
projectId: latestSnapshot.projectId,
288-
organizationId: latestSnapshot.organizationId,
289-
},
290-
});
291-
292-
// Complete the waitpoint
293-
if (run.associatedWaitpoint) {
294-
await this.waitpointSystem.completeWaitpoint({
295-
id: run.associatedWaitpoint.id,
296-
output: { value: JSON.stringify(error), isError: true },
285+
error: e,
297286
});
298287
}
299-
300-
// Emit event
301-
this.$.eventBus.emit("runExpired", {
302-
run: {
303-
id: run.id,
304-
spanId: run.spanId,
305-
ttl: run.ttl,
306-
taskEventStore: run.taskEventStore,
307-
createdAt: run.createdAt,
308-
updatedAt: now,
309-
completedAt: now,
310-
expiredAt: now,
311-
status: "EXPIRED" as TaskRunStatus,
312-
},
313-
time: now,
314-
organization: { id: run.runtimeEnvironment.organizationId },
315-
project: { id: run.runtimeEnvironment.projectId },
316-
environment: { id: run.runtimeEnvironment.id },
317-
});
318-
319-
expired.push(run.id);
320-
})
288+
},
289+
{ concurrency: 10, stopOnError: false }
321290
);
322291

323292
span.setAttribute("expiredCount", expired.length);

0 commit comments

Comments
 (0)