Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions pkgs/core/schemas/0100_function__cascade_force_skip_steps.sql
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,16 @@ BEGIN
false
) as _broadcast_result
),
-- ---------- Archive queued/started task messages for skipped steps ----------
archived_messages AS (
SELECT pgmq.archive(v_flow_slug, ARRAY_AGG(st.message_id)) as result
FROM pgflow.step_tasks st
WHERE st.run_id = _cascade_force_skip_steps.run_id
AND st.step_slug IN (SELECT sk.step_slug FROM skipped sk)
AND st.status IN ('queued', 'started')
AND st.message_id IS NOT NULL
HAVING COUNT(st.message_id) > 0
),
Comment on lines +94 to +102
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The archived_messages CTE is defined but never referenced or used. This creates duplicate archiving logic since lines 115-123 perform the same archiving operation again after the CTE chain completes. This causes messages to be archived twice or the first archiving to fail if no messages match at CTE execution time.

-- Remove the unused CTE entirely:
-- archived_messages AS (
--   SELECT pgmq.archive(v_flow_slug, ARRAY_AGG(st.message_id)) as result
--   ...
-- ),

-- Keep only the PERFORM statement at lines 115-123

The CTE result is never consumed (no SELECT * FROM archived_messages or similar), making it dead code that still executes and potentially conflicts with the later PERFORM statement.

Suggested change
archived_messages AS (
SELECT pgmq.archive(v_flow_slug, ARRAY_AGG(st.message_id)) as result
FROM pgflow.step_tasks st
WHERE st.run_id = _cascade_force_skip_steps.run_id
AND st.step_slug IN (SELECT sk.step_slug FROM skipped sk)
AND st.status IN ('queued', 'started')
AND st.message_id IS NOT NULL
HAVING COUNT(st.message_id) > 0
),
-- archived_messages CTE removed as it was redundant with later archiving code

Spotted by Graphite Agent

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

-- ---------- Update run counters ----------
run_updates AS (
UPDATE pgflow.runs r
Expand All @@ -100,6 +110,18 @@ BEGIN
)
SELECT COUNT(*) INTO v_total_skipped FROM skipped;

-- Archive queued/started task messages for all steps that were just skipped
-- (query step_states since CTE state is no longer accessible)
PERFORM pgmq.archive(v_flow_slug, ARRAY_AGG(st.message_id))
FROM pgflow.step_tasks st
JOIN pgflow.step_states ss ON ss.run_id = st.run_id AND ss.step_slug = st.step_slug
WHERE st.run_id = _cascade_force_skip_steps.run_id
AND st.status IN ('queued', 'started')
AND st.message_id IS NOT NULL
AND ss.status = 'skipped'
AND ss.skipped_at >= now() - interval '1 second' -- Only recently skipped
HAVING COUNT(st.message_id) > 0;

RETURN v_total_skipped;
END;
$$;
24 changes: 24 additions & 0 deletions pkgs/core/schemas/0100_function_complete_task.sql
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,30 @@ WHERE pgflow.step_states.run_id = complete_task.run_id
AND pgflow.step_states.step_slug = complete_task.step_slug
FOR UPDATE;

-- ==========================================
-- GUARD: Late callback - step not started
-- ==========================================
-- If the step is not in 'started' state, this is a late callback.
-- Do not mutate step_states or runs, archive message, return task row.
IF v_step_record.status != 'started' THEN
-- Archive the task message if present (prevents stuck work)
PERFORM pgmq.archive(
v_run_record.flow_slug,
st.message_id
)
FROM pgflow.step_tasks st
WHERE st.run_id = complete_task.run_id
AND st.step_slug = complete_task.step_slug
AND st.task_index = complete_task.task_index
AND st.message_id IS NOT NULL;
-- Return the current task row without any mutations
RETURN QUERY SELECT * FROM pgflow.step_tasks
WHERE pgflow.step_tasks.run_id = complete_task.run_id
AND pgflow.step_tasks.step_slug = complete_task.step_slug
AND pgflow.step_tasks.task_index = complete_task.task_index;
RETURN;
END IF;

-- Check for type violations AFTER acquiring locks
SELECT child_step.step_slug INTO v_dependent_map_slug
FROM pgflow.deps dependency
Expand Down
53 changes: 49 additions & 4 deletions pkgs/core/schemas/0100_function_fail_task.sql
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ DECLARE
v_step_failed boolean;
v_step_skipped boolean;
v_when_exhausted text;
v_task_exhausted boolean; -- True if task has exhausted retries
v_flow_slug_for_deps text; -- Used for decrementing remaining_deps on plain skip
v_task_exhausted boolean;
v_flow_slug_for_deps text;
v_prev_step_status text;
v_flow_slug text;
begin

-- If run is already failed, no retries allowed
Expand Down Expand Up @@ -47,6 +49,34 @@ IF EXISTS (SELECT 1 FROM pgflow.runs WHERE pgflow.runs.run_id = fail_task.run_id
RETURN;
END IF;

-- Late callback guard: if step is not 'started', don't mutate step/run state
-- Capture previous status BEFORE any CTE updates (for transition-based decrement)
SELECT ss.status INTO v_prev_step_status
FROM pgflow.step_states ss
WHERE ss.run_id = fail_task.run_id
AND ss.step_slug = fail_task.step_slug;

IF v_prev_step_status IS NOT NULL AND v_prev_step_status != 'started' THEN
-- Archive the task message if present
SELECT r.flow_slug INTO v_flow_slug
FROM pgflow.runs r
WHERE r.run_id = fail_task.run_id;

PERFORM pgmq.archive(v_flow_slug, ARRAY_AGG(st.message_id))
FROM pgflow.step_tasks st
WHERE st.run_id = fail_task.run_id
AND st.step_slug = fail_task.step_slug
AND st.task_index = fail_task.task_index
AND st.message_id IS NOT NULL
HAVING COUNT(st.message_id) > 0;

RETURN QUERY SELECT * FROM pgflow.step_tasks
WHERE pgflow.step_tasks.run_id = fail_task.run_id
AND pgflow.step_tasks.step_slug = fail_task.step_slug
AND pgflow.step_tasks.task_index = fail_task.task_index;
RETURN;
END IF;

WITH run_lock AS (
SELECT * FROM pgflow.runs
WHERE pgflow.runs.run_id = fail_task.run_id
Expand Down Expand Up @@ -152,9 +182,13 @@ run_update AS (
WHEN (select status from maybe_fail_step) = 'failed' THEN now()
ELSE NULL
END,
-- Decrement remaining_steps when step was skipped (not failed, run continues)
-- Decrement remaining_steps only on FIRST transition to skipped
-- (not when step was already skipped and a second task fails)
-- Uses PL/pgSQL variable captured before CTE chain
remaining_steps = CASE
WHEN (select status from maybe_fail_step) = 'skipped' THEN pgflow.runs.remaining_steps - 1
WHEN (select status from maybe_fail_step) = 'skipped'
AND v_prev_step_status != 'skipped'
THEN pgflow.runs.remaining_steps - 1
ELSE pgflow.runs.remaining_steps
END
WHERE pgflow.runs.run_id = fail_task.run_id
Expand Down Expand Up @@ -193,6 +227,17 @@ END IF;

-- Handle step skipping (when_exhausted = 'skip' or 'skip-cascade')
IF v_task_exhausted AND v_step_skipped THEN
-- Archive all queued/started sibling task messages for this step
PERFORM pgmq.archive(r.flow_slug, ARRAY_AGG(st.message_id))
FROM pgflow.step_tasks st
JOIN pgflow.runs r ON st.run_id = r.run_id
WHERE st.run_id = fail_task.run_id
AND st.step_slug = fail_task.step_slug
AND st.status IN ('queued', 'started')
AND st.message_id IS NOT NULL
GROUP BY r.flow_slug
HAVING COUNT(st.message_id) > 0;

-- Send broadcast event for step skipped
PERFORM realtime.send(
jsonb_build_object(
Expand Down
10 changes: 8 additions & 2 deletions pkgs/core/schemas/0120_function_start_tasks.sql
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,14 @@ as $$
where task.flow_slug = start_tasks.flow_slug
and task.message_id = any(msg_ids)
and task.status = 'queued'
-- MVP: Don't start tasks on failed runs
and r.status != 'failed'
and r.status = 'started'
and exists (
select 1
from pgflow.step_states ss
where ss.run_id = task.run_id
and ss.step_slug = task.step_slug
and ss.status = 'started'
)
),
start_tasks_update as (
update pgflow.step_tasks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,16 @@ BEGIN
false
) as _broadcast_result
),
-- ---------- Archive queued/started task messages for skipped steps ----------
archived_messages AS (
SELECT pgmq.archive(v_flow_slug, ARRAY_AGG(st.message_id)) as result
FROM pgflow.step_tasks st
WHERE st.run_id = _cascade_force_skip_steps.run_id
AND st.step_slug IN (SELECT sk.step_slug FROM skipped sk)
AND st.status IN ('queued', 'started')
AND st.message_id IS NOT NULL
HAVING COUNT(st.message_id) > 0
),
-- ---------- Update run counters ----------
run_updates AS (
UPDATE pgflow.runs r
Expand All @@ -422,6 +432,18 @@ BEGIN
)
SELECT COUNT(*) INTO v_total_skipped FROM skipped;

-- Archive queued/started task messages for all steps that were just skipped
-- (query step_states since CTE state is no longer accessible)
PERFORM pgmq.archive(v_flow_slug, ARRAY_AGG(st.message_id))
FROM pgflow.step_tasks st
JOIN pgflow.step_states ss ON ss.run_id = st.run_id AND ss.step_slug = st.step_slug
WHERE st.run_id = _cascade_force_skip_steps.run_id
AND st.status IN ('queued', 'started')
AND st.message_id IS NOT NULL
AND ss.status = 'skipped'
AND ss.skipped_at >= now() - interval '1 second' -- Only recently skipped
HAVING COUNT(st.message_id) > 0;

RETURN v_total_skipped;
END;
$$;
Expand Down Expand Up @@ -890,6 +912,30 @@ WHERE pgflow.step_states.run_id = complete_task.run_id
AND pgflow.step_states.step_slug = complete_task.step_slug
FOR UPDATE;

-- ==========================================
-- GUARD: Late callback - step not started
-- ==========================================
-- If the step is not in 'started' state, this is a late callback.
-- Do not mutate step_states or runs, archive message, return task row.
IF v_step_record.status != 'started' THEN
-- Archive the task message if present (prevents stuck work)
PERFORM pgmq.archive(
v_run_record.flow_slug,
st.message_id
)
FROM pgflow.step_tasks st
WHERE st.run_id = complete_task.run_id
AND st.step_slug = complete_task.step_slug
AND st.task_index = complete_task.task_index
AND st.message_id IS NOT NULL;
-- Return the current task row without any mutations
RETURN QUERY SELECT * FROM pgflow.step_tasks
WHERE pgflow.step_tasks.run_id = complete_task.run_id
AND pgflow.step_tasks.step_slug = complete_task.step_slug
AND pgflow.step_tasks.task_index = complete_task.task_index;
RETURN;
END IF;

-- Check for type violations AFTER acquiring locks
SELECT child_step.step_slug INTO v_dependent_map_slug
FROM pgflow.deps dependency
Expand Down Expand Up @@ -1246,8 +1292,10 @@ DECLARE
v_step_failed boolean;
v_step_skipped boolean;
v_when_exhausted text;
v_task_exhausted boolean; -- True if task has exhausted retries
v_flow_slug_for_deps text; -- Used for decrementing remaining_deps on plain skip
v_task_exhausted boolean;
v_flow_slug_for_deps text;
v_prev_step_status text;
v_flow_slug text;
begin

-- If run is already failed, no retries allowed
Expand Down Expand Up @@ -1279,6 +1327,34 @@ IF EXISTS (SELECT 1 FROM pgflow.runs WHERE pgflow.runs.run_id = fail_task.run_id
RETURN;
END IF;

-- Late callback guard: if step is not 'started', don't mutate step/run state
-- Capture previous status BEFORE any CTE updates (for transition-based decrement)
SELECT ss.status INTO v_prev_step_status
FROM pgflow.step_states ss
WHERE ss.run_id = fail_task.run_id
AND ss.step_slug = fail_task.step_slug;

IF v_prev_step_status IS NOT NULL AND v_prev_step_status != 'started' THEN
-- Archive the task message if present
SELECT r.flow_slug INTO v_flow_slug
FROM pgflow.runs r
WHERE r.run_id = fail_task.run_id;

PERFORM pgmq.archive(v_flow_slug, ARRAY_AGG(st.message_id))
FROM pgflow.step_tasks st
WHERE st.run_id = fail_task.run_id
AND st.step_slug = fail_task.step_slug
AND st.task_index = fail_task.task_index
AND st.message_id IS NOT NULL
HAVING COUNT(st.message_id) > 0;

RETURN QUERY SELECT * FROM pgflow.step_tasks
WHERE pgflow.step_tasks.run_id = fail_task.run_id
AND pgflow.step_tasks.step_slug = fail_task.step_slug
AND pgflow.step_tasks.task_index = fail_task.task_index;
RETURN;
END IF;

WITH run_lock AS (
SELECT * FROM pgflow.runs
WHERE pgflow.runs.run_id = fail_task.run_id
Expand Down Expand Up @@ -1384,9 +1460,13 @@ run_update AS (
WHEN (select status from maybe_fail_step) = 'failed' THEN now()
ELSE NULL
END,
-- Decrement remaining_steps when step was skipped (not failed, run continues)
-- Decrement remaining_steps only on FIRST transition to skipped
-- (not when step was already skipped and a second task fails)
-- Uses PL/pgSQL variable captured before CTE chain
remaining_steps = CASE
WHEN (select status from maybe_fail_step) = 'skipped' THEN pgflow.runs.remaining_steps - 1
WHEN (select status from maybe_fail_step) = 'skipped'
AND v_prev_step_status != 'skipped'
THEN pgflow.runs.remaining_steps - 1
ELSE pgflow.runs.remaining_steps
END
WHERE pgflow.runs.run_id = fail_task.run_id
Expand Down Expand Up @@ -1425,6 +1505,17 @@ END IF;

-- Handle step skipping (when_exhausted = 'skip' or 'skip-cascade')
IF v_task_exhausted AND v_step_skipped THEN
-- Archive all queued/started sibling task messages for this step
PERFORM pgmq.archive(r.flow_slug, ARRAY_AGG(st.message_id))
FROM pgflow.step_tasks st
JOIN pgflow.runs r ON st.run_id = r.run_id
WHERE st.run_id = fail_task.run_id
AND st.step_slug = fail_task.step_slug
AND st.status IN ('queued', 'started')
AND st.message_id IS NOT NULL
GROUP BY r.flow_slug
HAVING COUNT(st.message_id) > 0;

-- Send broadcast event for step skipped
PERFORM realtime.send(
jsonb_build_object(
Expand Down Expand Up @@ -1717,8 +1808,14 @@ with tasks as (
where task.flow_slug = start_tasks.flow_slug
and task.message_id = any(msg_ids)
and task.status = 'queued'
-- MVP: Don't start tasks on failed runs
and r.status != 'failed'
and r.status = 'started'
and exists (
select 1
from pgflow.step_states ss
where ss.run_id = task.run_id
and ss.step_slug = task.step_slug
and ss.status = 'started'
)
),
start_tasks_update as (
update pgflow.step_tasks
Expand Down
4 changes: 2 additions & 2 deletions pkgs/core/supabase/migrations/atlas.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
h1:NOsAYua/Juse0euNM4usm7M9DDPL7Btt5YvbexpfllA=
h1:Hk7WDNVqZP9iYz/vW2Dqe/G3qKdw6i2FVIYl05jn6Kk=
20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s=
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY=
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg=
Expand All @@ -18,4 +18,4 @@ h1:NOsAYua/Juse0euNM4usm7M9DDPL7Btt5YvbexpfllA=
20260103145141_pgflow_step_output_storage.sql h1:mgVHSFDLdtYy//SZ6C03j9Str1iS9xCM8Rz/wyFwn3o=
20260120205547_pgflow_requeue_stalled_tasks.sql h1:4wCBBvjtETCgJf1eXmlH5wCTKDUhiLi0uzsFG1V528E=
20260124113408_pgflow_auth_secret_support.sql h1:i/s1JkBqRElN6FOYFQviJt685W08SuSo30aP25lNlLc=
20260206115746_pgflow_step_conditions.sql h1:rGmG0hwC40AyEoofwX9Pj1b6DNiPG+pX9aLjEMgIoSQ=
20260214181656_pgflow_step_conditions.sql h1:uLPoOD/hPrerMACS6CThb7t7T5LKLpMMrdFXXi4ZQ5s=
Loading