Skip to content

Commit aaea8d6

Browse files
committed
Add queue length limits at the queue level, lazy waitpoint creation, new ttl system
1 parent 058b5db commit aaea8d6

File tree

31 files changed

+3297
-228
lines changed

31 files changed

+3297
-228
lines changed

apps/webapp/app/env.server.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -535,7 +535,7 @@ const EnvironmentSchema = z
535535

536536
MAXIMUM_DEV_QUEUE_SIZE: z.coerce.number().int().optional().default(500),
537537
MAXIMUM_DEPLOYED_QUEUE_SIZE: z.coerce.number().int().optional(),
538-
QUEUE_SIZE_CACHE_TTL_MS: z.coerce.number().int().optional().default(30_000), // 30 seconds
538+
QUEUE_SIZE_CACHE_TTL_MS: z.coerce.number().int().optional().default(1_000), // 1 second
539539
QUEUE_SIZE_CACHE_MAX_SIZE: z.coerce.number().int().optional().default(5_000),
540540
MAX_BATCH_V2_TRIGGER_ITEMS: z.coerce.number().int().default(500),
541541
MAX_BATCH_AND_WAIT_V2_TRIGGER_ITEMS: z.coerce.number().int().default(500),
@@ -593,6 +593,12 @@ const EnvironmentSchema = z
593593
RUN_ENGINE_CONCURRENCY_SWEEPER_SCAN_JITTER_IN_MS: z.coerce.number().int().optional(),
594594
RUN_ENGINE_CONCURRENCY_SWEEPER_PROCESS_MARKED_JITTER_IN_MS: z.coerce.number().int().optional(),
595595

596+
// TTL System settings for automatic run expiration
597+
RUN_ENGINE_TTL_SYSTEM_DISABLED: BoolEnv.default(false),
598+
RUN_ENGINE_TTL_SYSTEM_SHARD_COUNT: z.coerce.number().int().optional(),
599+
RUN_ENGINE_TTL_SYSTEM_POLL_INTERVAL_MS: z.coerce.number().int().default(1_000),
600+
RUN_ENGINE_TTL_SYSTEM_BATCH_SIZE: z.coerce.number().int().default(100),
601+
596602
RUN_ENGINE_RUN_LOCK_DURATION: z.coerce.number().int().default(5000),
597603
RUN_ENGINE_RUN_LOCK_AUTOMATIC_EXTENSION_THRESHOLD: z.coerce.number().int().default(1000),
598604
RUN_ENGINE_RUN_LOCK_MAX_RETRIES: z.coerce.number().int().default(10),

apps/webapp/app/presenters/v3/LimitsPresenter.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,7 @@ export class LimitsPresenter extends BasePresenter {
312312
},
313313
queueSize: {
314314
name: "Max queued runs",
315-
description: "Maximum pending runs across all queues in this environment",
315+
description: "Maximum pending runs per individual queue in this environment",
316316
limit: getQueueSizeLimit(environmentType, organization),
317317
currentUsage: currentQueueSize,
318318
source: getQueueSizeLimitSource(environmentType, organization),

apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.limits/route.tsx

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -556,9 +556,12 @@ function QuotaRow({
556556
billingPath: string;
557557
}) {
558558
// For log retention, we don't show current usage as it's a duration, not a count
559+
// For queue size, we don't show current usage as the limit is per-queue, not environment-wide
559560
const isRetentionQuota = quota.name === "Log retention";
561+
const isQueueSizeQuota = quota.name === "Max queued runs";
562+
const hideCurrentUsage = isRetentionQuota || isQueueSizeQuota;
560563
const percentage =
561-
!isRetentionQuota && quota.limit && quota.limit > 0 ? quota.currentUsage / quota.limit : null;
564+
!hideCurrentUsage && quota.limit && quota.limit > 0 ? quota.currentUsage / quota.limit : null;
562565

563566
// Special handling for Log retention
564567
if (quota.name === "Log retention") {
@@ -657,10 +660,10 @@ function QuotaRow({
657660
alignment="right"
658661
className={cn(
659662
"tabular-nums",
660-
isRetentionQuota ? "text-text-dimmed" : getUsageColorClass(percentage, "usage")
663+
hideCurrentUsage ? "text-text-dimmed" : getUsageColorClass(percentage, "usage")
661664
)}
662665
>
663-
{isRetentionQuota ? "–" : formatNumber(quota.currentUsage)}
666+
{hideCurrentUsage ? "–" : formatNumber(quota.currentUsage)}
664667
</TableCell>
665668
<TableCell alignment="right">
666669
<SourceBadge source={quota.source} />

apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx

Lines changed: 16 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ import { EnvironmentQueuePresenter } from "~/presenters/v3/EnvironmentQueuePrese
6868
import { QueueListPresenter } from "~/presenters/v3/QueueListPresenter.server";
6969
import { requireUserId } from "~/services/session.server";
7070
import { cn } from "~/utils/cn";
71-
import { formatNumberCompact } from "~/utils/numberFormatter";
7271
import {
7372
concurrencyPath,
7473
docsPath,
@@ -346,13 +345,7 @@ export default function Page() {
346345
<BigNumber
347346
title="Queued"
348347
value={environment.queued}
349-
suffix={
350-
<QueuedSuffix
351-
queued={environment.queued}
352-
queueSizeLimit={environment.queueSizeLimit}
353-
isPaused={env.paused}
354-
/>
355-
}
348+
suffix={env.paused ? <span className="text-warning">paused</span> : undefined}
356349
animate
357350
accessory={
358351
<div className="flex items-start gap-1">
@@ -371,10 +364,7 @@ export default function Page() {
371364
/>
372365
</div>
373366
}
374-
valueClassName={
375-
getQueueUsageColorClass(environment.queued, environment.queueSizeLimit) ??
376-
(env.paused ? "text-warning" : undefined)
377-
}
367+
valueClassName={env.paused ? "text-warning" : undefined}
378368
compactThreshold={1000000}
379369
/>
380370
<BigNumber
@@ -519,7 +509,10 @@ export default function Page() {
519509
{queues.length > 0 ? (
520510
queues.map((queue) => {
521511
const limit = queue.concurrencyLimit ?? environment.concurrencyLimit;
522-
const isAtLimit = queue.running >= limit;
512+
const isAtConcurrencyLimit = queue.running >= limit;
513+
const isAtQueueLimit =
514+
environment.queueSizeLimit !== null &&
515+
queue.queued >= environment.queueSizeLimit;
523516
const queueFilterableName = `${queue.type === "task" ? "task/" : ""}${
524517
queue.name
525518
}`;
@@ -545,7 +538,12 @@ export default function Page() {
545538
Paused
546539
</Badge>
547540
) : null}
548-
{isAtLimit ? (
541+
{isAtQueueLimit ? (
542+
<Badge variant="extra-small" className="text-error">
543+
At queue limit
544+
</Badge>
545+
) : null}
546+
{isAtConcurrencyLimit ? (
549547
<Badge variant="extra-small" className="text-warning">
550548
At concurrency limit
551549
</Badge>
@@ -556,7 +554,8 @@ export default function Page() {
556554
alignment="right"
557555
className={cn(
558556
"w-[1%] pl-16 tabular-nums",
559-
queue.paused ? "opacity-50" : undefined
557+
queue.paused ? "opacity-50" : undefined,
558+
isAtQueueLimit && "text-error"
560559
)}
561560
>
562561
{queue.queued}
@@ -567,7 +566,7 @@ export default function Page() {
567566
"w-[1%] pl-16 tabular-nums",
568567
queue.paused ? "opacity-50" : undefined,
569568
queue.running > 0 && "text-text-bright",
570-
isAtLimit && "text-warning"
569+
isAtConcurrencyLimit && "text-warning"
571570
)}
572571
>
573572
{queue.running}
@@ -587,7 +586,7 @@ export default function Page() {
587586
className={cn(
588587
"w-[1%] pl-16",
589588
queue.paused ? "opacity-50" : undefined,
590-
isAtLimit && "text-warning",
589+
isAtConcurrencyLimit && "text-warning",
591590
queue.concurrency?.overriddenAt && "font-medium text-text-bright"
592591
)}
593592
>
@@ -1129,52 +1128,3 @@ function BurstFactorTooltip({
11291128
);
11301129
}
11311130

1132-
function getQueueUsageColorClass(current: number, limit: number | null): string | undefined {
1133-
if (!limit) return undefined;
1134-
const percentage = current / limit;
1135-
if (percentage >= 1) return "text-error";
1136-
if (percentage >= 0.9) return "text-warning";
1137-
return undefined;
1138-
}
1139-
1140-
/**
1141-
* Renders the suffix for the Queued BigNumber, showing:
1142-
* - The limit with usage color and tooltip (if queueSizeLimit is set)
1143-
* - "paused" text (if environment is paused)
1144-
* - Both indicators when applicable
1145-
*/
1146-
function QueuedSuffix({
1147-
queued,
1148-
queueSizeLimit,
1149-
isPaused,
1150-
}: {
1151-
queued: number;
1152-
queueSizeLimit: number | null;
1153-
isPaused: boolean;
1154-
}) {
1155-
const showLimit = queueSizeLimit !== null;
1156-
1157-
if (!showLimit && !isPaused) {
1158-
return null;
1159-
}
1160-
1161-
return (
1162-
<span className="flex items-center gap-1">
1163-
{showLimit && (
1164-
<>
1165-
<span className="text-text-dimmed">/</span>
1166-
<span className={getQueueUsageColorClass(queued, queueSizeLimit)}>
1167-
{formatNumberCompact(queueSizeLimit)}
1168-
</span>
1169-
<InfoIconTooltip
1170-
content="Maximum pending runs across all queues in this environment"
1171-
contentClassName="max-w-xs"
1172-
/>
1173-
</>
1174-
)}
1175-
{isPaused && (
1176-
<span className="text-warning">{showLimit ? "(paused)" : "paused"}</span>
1177-
)}
1178-
</span>
1179-
);
1180-
}

apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,11 +79,26 @@ export class IdempotencyKeyConcern {
7979
}
8080

8181
// We have an idempotent run, so we return it
82-
const associatedWaitpoint = existingRun.associatedWaitpoint;
8382
const parentRunId = request.body.options?.parentRunId;
8483
const resumeParentOnCompletion = request.body.options?.resumeParentOnCompletion;
84+
8585
//We're using `andWait` so we need to block the parent run with a waitpoint
86-
if (associatedWaitpoint && resumeParentOnCompletion && parentRunId) {
86+
if (resumeParentOnCompletion && parentRunId) {
87+
// Get or create waitpoint lazily (existing run may not have one if it was standalone)
88+
let associatedWaitpoint = existingRun.associatedWaitpoint;
89+
if (!associatedWaitpoint) {
90+
associatedWaitpoint = await this.engine.getOrCreateRunWaitpoint({
91+
runId: existingRun.id,
92+
projectId: request.environment.projectId,
93+
environmentId: request.environment.id,
94+
});
95+
}
96+
97+
// If run already completed, return without blocking
98+
if (!associatedWaitpoint) {
99+
return { isCached: true, run: existingRun };
100+
}
101+
87102
await this.traceEventConcern.traceIdempotentRun(
88103
request,
89104
parentStore,
@@ -98,13 +113,13 @@ export class IdempotencyKeyConcern {
98113
request.options?.parentAsLinkType === "replay"
99114
? event.spanId
100115
: event.traceparent?.spanId
101-
? `${event.traceparent.spanId}:${event.spanId}`
102-
: event.spanId;
116+
? `${event.traceparent.spanId}:${event.spanId}`
117+
: event.spanId;
103118

104119
//block run with waitpoint
105120
await this.engine.blockRunWithWaitpoint({
106121
runId: RunId.fromFriendlyId(parentRunId),
107-
waitpoints: associatedWaitpoint.id,
122+
waitpoints: associatedWaitpoint!.id,
108123
spanIdToComplete: spanId,
109124
batch: request.options?.batchId
110125
? {

0 commit comments

Comments
 (0)