From 714b3b6f604df0267a6eb8f64611c722db795d93 Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Sat, 14 Feb 2026 03:24:18 +0100 Subject: [PATCH] fix missing skip archival --- ...100_function__cascade_force_skip_steps.sql | 22 ++++ .../schemas/0100_function_complete_task.sql | 24 ++++ pkgs/core/schemas/0100_function_fail_task.sql | 53 ++++++++- .../schemas/0120_function_start_tasks.sql | 10 +- ...20260214181656_pgflow_step_conditions.sql} | 109 +++++++++++++++++- pkgs/core/supabase/migrations/atlas.sum | 4 +- ...s_task_messages_for_skipped_steps.test.sql | 65 +++++++++++ .../broadcast_order.test.sql | 2 +- ..._skip_does_not_mutate_step_or_run.test.sql | 90 +++++++++++++++ ...met_fail_archives_active_messages.test.sql | 19 ++- ..._double_decrement_remaining_steps.test.sql | 72 ++++++++++++ ...ayed_fail_task_skip_is_idempotent.test.sql | 16 ++- .../skip_archives_sibling_messages.test.sql | 89 ++++++++++++++ .../skip_partial_deps_waits.test.sql | 16 ++- .../type_violation_always_hard_fails.test.sql | 8 +- ...ted_skip_cascade_skips_dependents.test.sql | 16 ++- .../when_exhausted_skip_skips_step.test.sql | 16 ++- ..._not_start_tasks_for_skipped_step.test.sql | 76 ++++++++++++ 18 files changed, 678 insertions(+), 29 deletions(-) rename pkgs/core/supabase/migrations/{20260206115746_pgflow_step_conditions.sql => 20260214181656_pgflow_step_conditions.sql} (94%) create mode 100644 pkgs/core/supabase/tests/_cascade_force_skip_steps/archives_task_messages_for_skipped_steps.test.sql create mode 100644 pkgs/core/supabase/tests/complete_task/late_complete_after_skip_does_not_mutate_step_or_run.test.sql create mode 100644 pkgs/core/supabase/tests/fail_task_when_exhausted/late_fail_after_skip_does_not_double_decrement_remaining_steps.test.sql create mode 100644 pkgs/core/supabase/tests/fail_task_when_exhausted/skip_archives_sibling_messages.test.sql create mode 100644 pkgs/core/supabase/tests/start_tasks/does_not_start_tasks_for_skipped_step.test.sql diff --git a/pkgs/core/schemas/0100_function__cascade_force_skip_steps.sql b/pkgs/core/schemas/0100_function__cascade_force_skip_steps.sql index b9f31e65a..4ca27be34 100644 --- a/pkgs/core/schemas/0100_function__cascade_force_skip_steps.sql +++ b/pkgs/core/schemas/0100_function__cascade_force_skip_steps.sql @@ -90,6 +90,16 @@ BEGIN false ) as _broadcast_result ), + -- ---------- Archive queued/started task messages for skipped steps ---------- + archived_messages AS ( + SELECT pgmq.archive(v_flow_slug, ARRAY_AGG(st.message_id)) as result + FROM pgflow.step_tasks st + WHERE st.run_id = _cascade_force_skip_steps.run_id + AND st.step_slug IN (SELECT sk.step_slug FROM skipped sk) + AND st.status IN ('queued', 'started') + AND st.message_id IS NOT NULL + HAVING COUNT(st.message_id) > 0 + ), -- ---------- Update run counters ---------- run_updates AS ( UPDATE pgflow.runs r @@ -100,6 +110,18 @@ BEGIN ) SELECT COUNT(*) INTO v_total_skipped FROM skipped; + -- Archive queued/started task messages for all steps that were just skipped + -- (query step_states since CTE state is no longer accessible) + PERFORM pgmq.archive(v_flow_slug, ARRAY_AGG(st.message_id)) + FROM pgflow.step_tasks st + JOIN pgflow.step_states ss ON ss.run_id = st.run_id AND ss.step_slug = st.step_slug + WHERE st.run_id = _cascade_force_skip_steps.run_id + AND st.status IN ('queued', 'started') + AND st.message_id IS NOT NULL + AND ss.status = 'skipped' + AND ss.skipped_at >= now() - interval '1 second' -- Only recently skipped + HAVING COUNT(st.message_id) > 0; + RETURN v_total_skipped; END; $$; diff --git a/pkgs/core/schemas/0100_function_complete_task.sql b/pkgs/core/schemas/0100_function_complete_task.sql index f4d5242d4..884bd54e1 100644 --- a/pkgs/core/schemas/0100_function_complete_task.sql +++ b/pkgs/core/schemas/0100_function_complete_task.sql @@ -40,6 +40,30 @@ WHERE pgflow.step_states.run_id = complete_task.run_id AND pgflow.step_states.step_slug = complete_task.step_slug FOR UPDATE; +-- ========================================== +-- GUARD: Late callback - step not started +-- ========================================== +-- If the step is not in 'started' state, this is a late callback. +-- Do not mutate step_states or runs, archive message, return task row. +IF v_step_record.status != 'started' THEN + -- Archive the task message if present (prevents stuck work) + PERFORM pgmq.archive( + v_run_record.flow_slug, + st.message_id + ) + FROM pgflow.step_tasks st + WHERE st.run_id = complete_task.run_id + AND st.step_slug = complete_task.step_slug + AND st.task_index = complete_task.task_index + AND st.message_id IS NOT NULL; + -- Return the current task row without any mutations + RETURN QUERY SELECT * FROM pgflow.step_tasks + WHERE pgflow.step_tasks.run_id = complete_task.run_id + AND pgflow.step_tasks.step_slug = complete_task.step_slug + AND pgflow.step_tasks.task_index = complete_task.task_index; + RETURN; +END IF; + -- Check for type violations AFTER acquiring locks SELECT child_step.step_slug INTO v_dependent_map_slug FROM pgflow.deps dependency diff --git a/pkgs/core/schemas/0100_function_fail_task.sql b/pkgs/core/schemas/0100_function_fail_task.sql index 9eac65d60..029b87d14 100644 --- a/pkgs/core/schemas/0100_function_fail_task.sql +++ b/pkgs/core/schemas/0100_function_fail_task.sql @@ -14,8 +14,10 @@ DECLARE v_step_failed boolean; v_step_skipped boolean; v_when_exhausted text; - v_task_exhausted boolean; -- True if task has exhausted retries - v_flow_slug_for_deps text; -- Used for decrementing remaining_deps on plain skip + v_task_exhausted boolean; + v_flow_slug_for_deps text; + v_prev_step_status text; + v_flow_slug text; begin -- If run is already failed, no retries allowed @@ -47,6 +49,34 @@ IF EXISTS (SELECT 1 FROM pgflow.runs WHERE pgflow.runs.run_id = fail_task.run_id RETURN; END IF; +-- Late callback guard: if step is not 'started', don't mutate step/run state +-- Capture previous status BEFORE any CTE updates (for transition-based decrement) +SELECT ss.status INTO v_prev_step_status +FROM pgflow.step_states ss +WHERE ss.run_id = fail_task.run_id + AND ss.step_slug = fail_task.step_slug; + +IF v_prev_step_status IS NOT NULL AND v_prev_step_status != 'started' THEN + -- Archive the task message if present + SELECT r.flow_slug INTO v_flow_slug + FROM pgflow.runs r + WHERE r.run_id = fail_task.run_id; + + PERFORM pgmq.archive(v_flow_slug, ARRAY_AGG(st.message_id)) + FROM pgflow.step_tasks st + WHERE st.run_id = fail_task.run_id + AND st.step_slug = fail_task.step_slug + AND st.task_index = fail_task.task_index + AND st.message_id IS NOT NULL + HAVING COUNT(st.message_id) > 0; + + RETURN QUERY SELECT * FROM pgflow.step_tasks + WHERE pgflow.step_tasks.run_id = fail_task.run_id + AND pgflow.step_tasks.step_slug = fail_task.step_slug + AND pgflow.step_tasks.task_index = fail_task.task_index; + RETURN; +END IF; + WITH run_lock AS ( SELECT * FROM pgflow.runs WHERE pgflow.runs.run_id = fail_task.run_id @@ -152,9 +182,13 @@ run_update AS ( WHEN (select status from maybe_fail_step) = 'failed' THEN now() ELSE NULL END, - -- Decrement remaining_steps when step was skipped (not failed, run continues) + -- Decrement remaining_steps only on FIRST transition to skipped + -- (not when step was already skipped and a second task fails) + -- Uses PL/pgSQL variable captured before CTE chain remaining_steps = CASE - WHEN (select status from maybe_fail_step) = 'skipped' THEN pgflow.runs.remaining_steps - 1 + WHEN (select status from maybe_fail_step) = 'skipped' + AND v_prev_step_status != 'skipped' + THEN pgflow.runs.remaining_steps - 1 ELSE pgflow.runs.remaining_steps END WHERE pgflow.runs.run_id = fail_task.run_id @@ -193,6 +227,17 @@ END IF; -- Handle step skipping (when_exhausted = 'skip' or 'skip-cascade') IF v_task_exhausted AND v_step_skipped THEN + -- Archive all queued/started sibling task messages for this step + PERFORM pgmq.archive(r.flow_slug, ARRAY_AGG(st.message_id)) + FROM pgflow.step_tasks st + JOIN pgflow.runs r ON st.run_id = r.run_id + WHERE st.run_id = fail_task.run_id + AND st.step_slug = fail_task.step_slug + AND st.status IN ('queued', 'started') + AND st.message_id IS NOT NULL + GROUP BY r.flow_slug + HAVING COUNT(st.message_id) > 0; + -- Send broadcast event for step skipped PERFORM realtime.send( jsonb_build_object( diff --git a/pkgs/core/schemas/0120_function_start_tasks.sql b/pkgs/core/schemas/0120_function_start_tasks.sql index 627497e3a..c9e367d16 100644 --- a/pkgs/core/schemas/0120_function_start_tasks.sql +++ b/pkgs/core/schemas/0120_function_start_tasks.sql @@ -20,8 +20,14 @@ as $$ where task.flow_slug = start_tasks.flow_slug and task.message_id = any(msg_ids) and task.status = 'queued' - -- MVP: Don't start tasks on failed runs - and r.status != 'failed' + and r.status = 'started' + and exists ( + select 1 + from pgflow.step_states ss + where ss.run_id = task.run_id + and ss.step_slug = task.step_slug + and ss.status = 'started' + ) ), start_tasks_update as ( update pgflow.step_tasks diff --git a/pkgs/core/supabase/migrations/20260206115746_pgflow_step_conditions.sql b/pkgs/core/supabase/migrations/20260214181656_pgflow_step_conditions.sql similarity index 94% rename from pkgs/core/supabase/migrations/20260206115746_pgflow_step_conditions.sql rename to pkgs/core/supabase/migrations/20260214181656_pgflow_step_conditions.sql index 1486a48b5..1214dbeeb 100644 --- a/pkgs/core/supabase/migrations/20260206115746_pgflow_step_conditions.sql +++ b/pkgs/core/supabase/migrations/20260214181656_pgflow_step_conditions.sql @@ -412,6 +412,16 @@ BEGIN false ) as _broadcast_result ), + -- ---------- Archive queued/started task messages for skipped steps ---------- + archived_messages AS ( + SELECT pgmq.archive(v_flow_slug, ARRAY_AGG(st.message_id)) as result + FROM pgflow.step_tasks st + WHERE st.run_id = _cascade_force_skip_steps.run_id + AND st.step_slug IN (SELECT sk.step_slug FROM skipped sk) + AND st.status IN ('queued', 'started') + AND st.message_id IS NOT NULL + HAVING COUNT(st.message_id) > 0 + ), -- ---------- Update run counters ---------- run_updates AS ( UPDATE pgflow.runs r @@ -422,6 +432,18 @@ BEGIN ) SELECT COUNT(*) INTO v_total_skipped FROM skipped; + -- Archive queued/started task messages for all steps that were just skipped + -- (query step_states since CTE state is no longer accessible) + PERFORM pgmq.archive(v_flow_slug, ARRAY_AGG(st.message_id)) + FROM pgflow.step_tasks st + JOIN pgflow.step_states ss ON ss.run_id = st.run_id AND ss.step_slug = st.step_slug + WHERE st.run_id = _cascade_force_skip_steps.run_id + AND st.status IN ('queued', 'started') + AND st.message_id IS NOT NULL + AND ss.status = 'skipped' + AND ss.skipped_at >= now() - interval '1 second' -- Only recently skipped + HAVING COUNT(st.message_id) > 0; + RETURN v_total_skipped; END; $$; @@ -890,6 +912,30 @@ WHERE pgflow.step_states.run_id = complete_task.run_id AND pgflow.step_states.step_slug = complete_task.step_slug FOR UPDATE; +-- ========================================== +-- GUARD: Late callback - step not started +-- ========================================== +-- If the step is not in 'started' state, this is a late callback. +-- Do not mutate step_states or runs, archive message, return task row. +IF v_step_record.status != 'started' THEN + -- Archive the task message if present (prevents stuck work) + PERFORM pgmq.archive( + v_run_record.flow_slug, + st.message_id + ) + FROM pgflow.step_tasks st + WHERE st.run_id = complete_task.run_id + AND st.step_slug = complete_task.step_slug + AND st.task_index = complete_task.task_index + AND st.message_id IS NOT NULL; + -- Return the current task row without any mutations + RETURN QUERY SELECT * FROM pgflow.step_tasks + WHERE pgflow.step_tasks.run_id = complete_task.run_id + AND pgflow.step_tasks.step_slug = complete_task.step_slug + AND pgflow.step_tasks.task_index = complete_task.task_index; + RETURN; +END IF; + -- Check for type violations AFTER acquiring locks SELECT child_step.step_slug INTO v_dependent_map_slug FROM pgflow.deps dependency @@ -1246,8 +1292,10 @@ DECLARE v_step_failed boolean; v_step_skipped boolean; v_when_exhausted text; - v_task_exhausted boolean; -- True if task has exhausted retries - v_flow_slug_for_deps text; -- Used for decrementing remaining_deps on plain skip + v_task_exhausted boolean; + v_flow_slug_for_deps text; + v_prev_step_status text; + v_flow_slug text; begin -- If run is already failed, no retries allowed @@ -1279,6 +1327,34 @@ IF EXISTS (SELECT 1 FROM pgflow.runs WHERE pgflow.runs.run_id = fail_task.run_id RETURN; END IF; +-- Late callback guard: if step is not 'started', don't mutate step/run state +-- Capture previous status BEFORE any CTE updates (for transition-based decrement) +SELECT ss.status INTO v_prev_step_status +FROM pgflow.step_states ss +WHERE ss.run_id = fail_task.run_id + AND ss.step_slug = fail_task.step_slug; + +IF v_prev_step_status IS NOT NULL AND v_prev_step_status != 'started' THEN + -- Archive the task message if present + SELECT r.flow_slug INTO v_flow_slug + FROM pgflow.runs r + WHERE r.run_id = fail_task.run_id; + + PERFORM pgmq.archive(v_flow_slug, ARRAY_AGG(st.message_id)) + FROM pgflow.step_tasks st + WHERE st.run_id = fail_task.run_id + AND st.step_slug = fail_task.step_slug + AND st.task_index = fail_task.task_index + AND st.message_id IS NOT NULL + HAVING COUNT(st.message_id) > 0; + + RETURN QUERY SELECT * FROM pgflow.step_tasks + WHERE pgflow.step_tasks.run_id = fail_task.run_id + AND pgflow.step_tasks.step_slug = fail_task.step_slug + AND pgflow.step_tasks.task_index = fail_task.task_index; + RETURN; +END IF; + WITH run_lock AS ( SELECT * FROM pgflow.runs WHERE pgflow.runs.run_id = fail_task.run_id @@ -1384,9 +1460,13 @@ run_update AS ( WHEN (select status from maybe_fail_step) = 'failed' THEN now() ELSE NULL END, - -- Decrement remaining_steps when step was skipped (not failed, run continues) + -- Decrement remaining_steps only on FIRST transition to skipped + -- (not when step was already skipped and a second task fails) + -- Uses PL/pgSQL variable captured before CTE chain remaining_steps = CASE - WHEN (select status from maybe_fail_step) = 'skipped' THEN pgflow.runs.remaining_steps - 1 + WHEN (select status from maybe_fail_step) = 'skipped' + AND v_prev_step_status != 'skipped' + THEN pgflow.runs.remaining_steps - 1 ELSE pgflow.runs.remaining_steps END WHERE pgflow.runs.run_id = fail_task.run_id @@ -1425,6 +1505,17 @@ END IF; -- Handle step skipping (when_exhausted = 'skip' or 'skip-cascade') IF v_task_exhausted AND v_step_skipped THEN + -- Archive all queued/started sibling task messages for this step + PERFORM pgmq.archive(r.flow_slug, ARRAY_AGG(st.message_id)) + FROM pgflow.step_tasks st + JOIN pgflow.runs r ON st.run_id = r.run_id + WHERE st.run_id = fail_task.run_id + AND st.step_slug = fail_task.step_slug + AND st.status IN ('queued', 'started') + AND st.message_id IS NOT NULL + GROUP BY r.flow_slug + HAVING COUNT(st.message_id) > 0; + -- Send broadcast event for step skipped PERFORM realtime.send( jsonb_build_object( @@ -1717,8 +1808,14 @@ with tasks as ( where task.flow_slug = start_tasks.flow_slug and task.message_id = any(msg_ids) and task.status = 'queued' - -- MVP: Don't start tasks on failed runs - and r.status != 'failed' + and r.status = 'started' + and exists ( + select 1 + from pgflow.step_states ss + where ss.run_id = task.run_id + and ss.step_slug = task.step_slug + and ss.status = 'started' + ) ), start_tasks_update as ( update pgflow.step_tasks diff --git a/pkgs/core/supabase/migrations/atlas.sum b/pkgs/core/supabase/migrations/atlas.sum index 3b23e3e8c..685d7c457 100644 --- a/pkgs/core/supabase/migrations/atlas.sum +++ b/pkgs/core/supabase/migrations/atlas.sum @@ -1,4 +1,4 @@ -h1:NOsAYua/Juse0euNM4usm7M9DDPL7Btt5YvbexpfllA= +h1:Hk7WDNVqZP9iYz/vW2Dqe/G3qKdw6i2FVIYl05jn6Kk= 20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s= 20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY= 20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg= @@ -18,4 +18,4 @@ h1:NOsAYua/Juse0euNM4usm7M9DDPL7Btt5YvbexpfllA= 20260103145141_pgflow_step_output_storage.sql h1:mgVHSFDLdtYy//SZ6C03j9Str1iS9xCM8Rz/wyFwn3o= 20260120205547_pgflow_requeue_stalled_tasks.sql h1:4wCBBvjtETCgJf1eXmlH5wCTKDUhiLi0uzsFG1V528E= 20260124113408_pgflow_auth_secret_support.sql h1:i/s1JkBqRElN6FOYFQviJt685W08SuSo30aP25lNlLc= -20260206115746_pgflow_step_conditions.sql h1:rGmG0hwC40AyEoofwX9Pj1b6DNiPG+pX9aLjEMgIoSQ= +20260214181656_pgflow_step_conditions.sql h1:uLPoOD/hPrerMACS6CThb7t7T5LKLpMMrdFXXi4ZQ5s= diff --git a/pkgs/core/supabase/tests/_cascade_force_skip_steps/archives_task_messages_for_skipped_steps.test.sql b/pkgs/core/supabase/tests/_cascade_force_skip_steps/archives_task_messages_for_skipped_steps.test.sql new file mode 100644 index 000000000..438b20052 --- /dev/null +++ b/pkgs/core/supabase/tests/_cascade_force_skip_steps/archives_task_messages_for_skipped_steps.test.sql @@ -0,0 +1,65 @@ +\set ON_ERROR_STOP on +\set QUIET on + +begin; +select plan(5); + +select pgflow_tests.reset_db(); + +select pgflow.create_flow('cascade_skip_archive'); +select pgflow.add_step('cascade_skip_archive', 'map_a', '{}', max_attempts=>0, step_type=>'map', when_exhausted=>'skip'); +select pgflow.add_step('cascade_skip_archive', 'other', '{}'); + +select pgflow.start_flow('cascade_skip_archive', '[1, 2, 3]'::jsonb); + +with tasks as ( + select message_id, task_index + from pgflow.step_tasks + where flow_slug = 'cascade_skip_archive' and step_slug = 'map_a' + order by task_index +) +select pgflow.start_tasks('cascade_skip_archive', array[(select message_id from tasks where task_index = 0)::bigint], pgflow_tests.ensure_worker('cascade_skip_archive')); + +select ok( + (select count(*) = 3 from pgflow.step_tasks + where flow_slug = 'cascade_skip_archive' and step_slug = 'map_a'), + 'Setup: map_a should have 3 tasks' +); + +select ok( + (select count(*) >= 1 from pgmq.q_cascade_skip_archive), + 'Setup: queue should have messages' +); + +select pgflow._cascade_force_skip_steps( + (select run_id from pgflow.runs where flow_slug = 'cascade_skip_archive'), + 'map_a', + 'condition_unmet' +); + +select is( + (select status from pgflow.step_states where flow_slug = 'cascade_skip_archive' and step_slug = 'map_a'), + 'skipped'::text, + 'Step state should be skipped' +); + +select is_empty( + $$ + select 1 + from pgmq.q_cascade_skip_archive q + join pgflow.step_tasks st on st.message_id = q.msg_id + where st.flow_slug = 'cascade_skip_archive' + and st.step_slug = 'map_a' + $$, + 'Queue should have 0 messages for skipped map_a step after _cascade_force_skip_steps' +); + +select ok( + (select count(*) >= 3 from pgmq.a_cascade_skip_archive a + join pgflow.step_tasks st on st.message_id = a.msg_id + where st.flow_slug = 'cascade_skip_archive' and st.step_slug = 'map_a'), + 'Archive should contain all 3 map_a task messages' +); + +select * from finish(); +rollback; diff --git a/pkgs/core/supabase/tests/_cascade_force_skip_steps/broadcast_order.test.sql b/pkgs/core/supabase/tests/_cascade_force_skip_steps/broadcast_order.test.sql index 012f3368e..06083db2b 100644 --- a/pkgs/core/supabase/tests/_cascade_force_skip_steps/broadcast_order.test.sql +++ b/pkgs/core/supabase/tests/_cascade_force_skip_steps/broadcast_order.test.sql @@ -37,7 +37,7 @@ with ordered_events as ( select inserted_at, payload->>'step_slug' as step_slug, - row_number() over (order by inserted_at) as event_order + row_number() over (order by inserted_at, payload->>'step_slug') as event_order from realtime.messages where payload->>'event_type' = 'step:skipped' and payload->>'run_id' = (select run_id::text from run_ids) diff --git a/pkgs/core/supabase/tests/complete_task/late_complete_after_skip_does_not_mutate_step_or_run.test.sql b/pkgs/core/supabase/tests/complete_task/late_complete_after_skip_does_not_mutate_step_or_run.test.sql new file mode 100644 index 000000000..ef22a6cf3 --- /dev/null +++ b/pkgs/core/supabase/tests/complete_task/late_complete_after_skip_does_not_mutate_step_or_run.test.sql @@ -0,0 +1,90 @@ +-- Test: Late complete after step is skipped should not mutate step or run state +-- Verifies defense-in-depth: callbacks cannot change state after step is no longer started +begin; +select plan(4); +select pgflow_tests.reset_db(); + +-- Setup: Create flow with map_a (skip on exhaust) and independent 'other' step +select pgflow.create_flow('late_complete_test'); +select pgflow.add_step( + flow_slug => 'late_complete_test', + step_slug => 'map_a', + step_type => 'map', + max_attempts => 0, + when_exhausted => 'skip' +); +select pgflow.add_step( + flow_slug => 'late_complete_test', + step_slug => 'other' +); + +-- Start flow with 2 array elements for map_a (root map gets array directly) +select run_id as test_run_id from pgflow.start_flow('late_complete_test', '[1, 2]'::jsonb) \gset + +-- Ensure worker exists +select pgflow_tests.ensure_worker('late_complete_test') as test_worker_id \gset + +-- Start both map_a tasks +select message_id as msg_0 from pgflow.step_tasks +where run_id = :'test_run_id'::uuid and step_slug = 'map_a' and task_index = 0 \gset + +select pgflow.start_tasks('late_complete_test', array[:'msg_0'::bigint], :'test_worker_id'::uuid); + +select message_id as msg_1 from pgflow.step_tasks +where run_id = :'test_run_id'::uuid and step_slug = 'map_a' and task_index = 1 \gset + +select pgflow.start_tasks('late_complete_test', array[:'msg_1'::bigint], :'test_worker_id'::uuid); + +-- Fail map_a[0] to trigger skip (max_attempts=0, when_exhausted='skip') +-- This makes the step transition to 'skipped' +select pgflow.fail_task( + :'test_run_id'::uuid, + 'map_a', + 0, + 'Task 0 failed!' +); + +-- Verify step became skipped +select is( + (select status from pgflow.step_states + where run_id = :'test_run_id'::uuid and step_slug = 'map_a'), + 'skipped', + 'Step should be skipped after fail with when_exhausted=skip' +); + +-- Capture remaining_steps after skip +select remaining_steps as remaining_steps_after_skip +from pgflow.runs +where run_id = :'test_run_id'::uuid \gset + +-- LATE COMPLETE: Try to complete map_a[1] after step is already skipped +-- This should NOT mutate step or run state +select lives_ok( + format($$ + select pgflow.complete_task( + '%s'::uuid, + 'map_a', + 1, + '{"ok":true}'::jsonb + ) + $$, :'test_run_id'), + 'Late complete should not error' +); + +-- Verify step state unchanged (remains skipped) +select is( + (select status from pgflow.step_states + where run_id = :'test_run_id'::uuid and step_slug = 'map_a'), + 'skipped', + 'Step should remain skipped after late complete' +); + +-- Verify remaining_steps unchanged by late complete +select is( + (select remaining_steps from pgflow.runs where run_id = :'test_run_id'::uuid), + :remaining_steps_after_skip, + 'remaining_steps should not be decremented by late complete' +); + +select * from finish(); +rollback; diff --git a/pkgs/core/supabase/tests/condition_evaluation/dependent_unmet_fail_archives_active_messages.test.sql b/pkgs/core/supabase/tests/condition_evaluation/dependent_unmet_fail_archives_active_messages.test.sql index b43dcf413..03944f9fa 100644 --- a/pkgs/core/supabase/tests/condition_evaluation/dependent_unmet_fail_archives_active_messages.test.sql +++ b/pkgs/core/supabase/tests/condition_evaluation/dependent_unmet_fail_archives_active_messages.test.sql @@ -26,13 +26,20 @@ with run as ( ) select run_id into temporary run_ids from run; -select pgflow_tests.read_and_start('dependent_fail_archive'); - +with started as ( + select * from pgflow_tests.read_and_start('dependent_fail_archive', qty => 10) +), +target as ( + select run_id, step_slug, task_index + from started + where step_slug = 'first' + limit 1 +) select pgflow.complete_task( - run_id => (select run_id from run_ids), - step_slug => 'first', - task_index => 0, - output => '{"ok": false}'::jsonb + (select run_id from target), + (select step_slug from target), + (select task_index from target), + '{"ok": false}'::jsonb ); select is( diff --git a/pkgs/core/supabase/tests/fail_task_when_exhausted/late_fail_after_skip_does_not_double_decrement_remaining_steps.test.sql b/pkgs/core/supabase/tests/fail_task_when_exhausted/late_fail_after_skip_does_not_double_decrement_remaining_steps.test.sql new file mode 100644 index 000000000..e3b774a8c --- /dev/null +++ b/pkgs/core/supabase/tests/fail_task_when_exhausted/late_fail_after_skip_does_not_double_decrement_remaining_steps.test.sql @@ -0,0 +1,72 @@ +begin; +select plan(6); +select pgflow_tests.reset_db(); + +-- Setup: Create a flow with map step (max_attempts=>0, when_exhausted=>'skip') and 'other' step +select pgflow.create_flow('double_decrement_test'); +select pgflow.add_step('double_decrement_test', 'map_a', '{}', max_attempts=>0, step_type=>'map', when_exhausted=>'skip'); +select pgflow.add_step('double_decrement_test', 'other', '{}'); + +-- Start run with 2 items for map_a +select pgflow.start_flow('double_decrement_test', '[1,2]'::jsonb); +select is(count(*), 2::bigint, 'map_a has 2 tasks') from pgflow.step_tasks where step_slug = 'map_a'; + +-- Create worker +select pgflow_tests.ensure_worker('double_decrement_test', '00000000-0000-0000-0000-000000000001'::uuid, 'handler'); + +-- Start both map_a tasks +select pgflow.start_tasks( + 'double_decrement_test', + (select array_agg(message_id) from pgflow.step_tasks st join pgflow.runs r on st.run_id = r.run_id where r.flow_slug = 'double_decrement_test' and st.step_slug = 'map_a'), + '00000000-0000-0000-0000-000000000001'::uuid +); +select is(count(*), 2::bigint, 'Both map_a tasks are started') from pgflow.step_tasks st join pgflow.runs r on st.run_id = r.run_id where r.flow_slug = 'double_decrement_test' and st.step_slug = 'map_a' and st.status = 'started'; + +-- Start 'other' task (to keep run alive) +select pgflow.start_tasks( + 'double_decrement_test', + (select array_agg(message_id) from pgflow.step_tasks st join pgflow.runs r on st.run_id = r.run_id where r.flow_slug = 'double_decrement_test' and st.step_slug = 'other'), + '00000000-0000-0000-0000-000000000001'::uuid +); + +-- Capture remaining_steps BEFORE first fail +create temp table baseline as +select remaining_steps, run_id from pgflow.runs where flow_slug = 'double_decrement_test'; + +-- Fail map_a[0] -> step becomes skipped +select pgflow.fail_task(baseline.run_id, 'map_a', 0, 'First task failed') from baseline; + +-- Capture remaining_steps AFTER first skip +create temp table after_first_skip as +select r.remaining_steps, r.run_id from pgflow.runs r, baseline b where r.run_id = b.run_id; + +-- Verify step is skipped +select is( + (select ss.status from pgflow.step_states ss, after_first_skip a where ss.run_id = a.run_id and ss.step_slug = 'map_a'), + 'skipped', + 'map_a step is skipped after first task failure' +); + +-- Verify remaining_steps decreased +select ok( + (select a.remaining_steps < b.remaining_steps from after_first_skip a, baseline b where a.run_id = b.run_id), + 'remaining_steps decremented after first skip' +); + +-- Late-fail map_a[1] (after step is already skipped) +select lives_ok( + $$ + select pgflow.fail_task(a.run_id, 'map_a', 1, 'Late failure') from after_first_skip a + $$, + 'Late fail on skipped step does not error' +); + +-- Verify remaining_steps is UNCHANGED (no double-decrement) +select is( + (select r.remaining_steps from pgflow.runs r, after_first_skip a where r.run_id = a.run_id), + (select remaining_steps from after_first_skip), + 'remaining_steps unchanged after late fail (no double-decrement)' +); + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/fail_task_when_exhausted/replayed_fail_task_skip_is_idempotent.test.sql b/pkgs/core/supabase/tests/fail_task_when_exhausted/replayed_fail_task_skip_is_idempotent.test.sql index 729b37040..12c3a60fe 100644 --- a/pkgs/core/supabase/tests/fail_task_when_exhausted/replayed_fail_task_skip_is_idempotent.test.sql +++ b/pkgs/core/supabase/tests/fail_task_when_exhausted/replayed_fail_task_skip_is_idempotent.test.sql @@ -31,7 +31,21 @@ with run as ( ) select run_id into temporary run_ids from run; -select pgflow_tests.poll_and_fail('replayed_skip_idempotent'); +with started as ( + select * from pgflow_tests.read_and_start('replayed_skip_idempotent', qty => 10) +), +target as ( + select run_id, step_slug, task_index + from started + where step_slug = 'a' + limit 1 +) +select pgflow.fail_task( + (select run_id from target), + (select step_slug from target), + (select task_index from target), + (select step_slug from target) || ' FAILED' +); select is( ( diff --git a/pkgs/core/supabase/tests/fail_task_when_exhausted/skip_archives_sibling_messages.test.sql b/pkgs/core/supabase/tests/fail_task_when_exhausted/skip_archives_sibling_messages.test.sql new file mode 100644 index 000000000..19cf517a6 --- /dev/null +++ b/pkgs/core/supabase/tests/fail_task_when_exhausted/skip_archives_sibling_messages.test.sql @@ -0,0 +1,89 @@ +-- Test: when_exhausted='skip' should archive all queued/started sibling task messages +-- Verifies that when a map step transitions to skipped, sibling messages are archived +begin; +select plan(6); +select pgflow_tests.reset_db(); + +-- Setup: Create flow with single root map step (max_attempts=0, when_exhausted='skip') +select pgflow.create_flow('skip_archive_test'); +select pgflow.add_step( + flow_slug => 'skip_archive_test', + step_slug => 'map_a', + step_type => 'map', + max_attempts => 0, + when_exhausted => 'skip' +); + +-- Start flow with 3 array elements (creates 3 tasks) +select run_id as test_run_id from pgflow.start_flow('skip_archive_test', '[1, 2, 3]'::jsonb) \gset + +-- Verify all 3 messages are in queue +select is( + (select count(*) from pgmq.q_skip_archive_test), + 3::bigint, + 'Should have 3 messages in queue initially' +); + +-- Ensure worker exists (returns worker_id uuid) +select pgflow_tests.ensure_worker('skip_archive_test') as test_worker_id \gset + +-- Start task 0 and task 1 (leave task 2 queued) +-- Get message_id for task 0 +select message_id as msg_0 from pgflow.step_tasks +where run_id = :'test_run_id'::uuid and step_slug = 'map_a' and task_index = 0 \gset + +select pgflow.start_tasks('skip_archive_test', array[:'msg_0'::bigint], :'test_worker_id'::uuid); + +-- Get message_id for task 1 +select message_id as msg_1 from pgflow.step_tasks +where run_id = :'test_run_id'::uuid and step_slug = 'map_a' and task_index = 1 \gset + +select pgflow.start_tasks('skip_archive_test', array[:'msg_1'::bigint], :'test_worker_id'::uuid); + +-- Verify: 2 started, 1 queued +select is( + (select count(*)::int from pgflow.step_tasks + where run_id = :'test_run_id'::uuid and step_slug = 'map_a' and status = 'started'), + 2, + 'Should have 2 started tasks' +); + +select is( + (select count(*)::int from pgflow.step_tasks + where run_id = :'test_run_id'::uuid and step_slug = 'map_a' and status = 'queued'), + 1, + 'Should have 1 queued task' +); + +-- Fail task 0 (max_attempts=0 means immediate exhaustion -> step becomes skipped) +select pgflow.fail_task( + :'test_run_id'::uuid, + 'map_a', + 0, + 'Task 0 failed!' +); + +-- CRITICAL TEST: Queue should have 0 messages (all archived when step skipped) +select is( + (select count(*) from pgmq.q_skip_archive_test), + 0::bigint, + 'Queue should be empty - sibling messages archived when step skipped' +); + +-- Test: Verify messages were archived (1 from failed task, 2 from siblings) +select is( + (select count(*) from pgmq.a_skip_archive_test), + 3::bigint, + 'All 3 messages should be in archive (failed task + 2 siblings)' +); + +-- Test: Step should be skipped (not failed) +select is( + (select status from pgflow.step_states + where run_id = :'test_run_id'::uuid and step_slug = 'map_a'), + 'skipped', + 'Step should be skipped when when_exhausted=skip' +); + +select * from finish(); +rollback; diff --git a/pkgs/core/supabase/tests/fail_task_when_exhausted/skip_partial_deps_waits.test.sql b/pkgs/core/supabase/tests/fail_task_when_exhausted/skip_partial_deps_waits.test.sql index e8fdb05b5..09061dcfb 100644 --- a/pkgs/core/supabase/tests/fail_task_when_exhausted/skip_partial_deps_waits.test.sql +++ b/pkgs/core/supabase/tests/fail_task_when_exhausted/skip_partial_deps_waits.test.sql @@ -36,7 +36,21 @@ select is( ); -- Poll and fail step_a (step_b is still running) -select pgflow_tests.poll_and_fail('partial_skip'); +with started as ( + select * from pgflow_tests.read_and_start('partial_skip', qty => 10) +), +target as ( + select run_id, step_slug, task_index + from started + where step_slug = 'step_a' + limit 1 +) +select pgflow.fail_task( + (select run_id from target), + (select step_slug from target), + (select task_index from target), + (select step_slug from target) || ' FAILED' +); -- Test 1: step_a should be skipped select is( diff --git a/pkgs/core/supabase/tests/fail_task_when_exhausted/type_violation_always_hard_fails.test.sql b/pkgs/core/supabase/tests/fail_task_when_exhausted/type_violation_always_hard_fails.test.sql index b7d2029b1..1163092c6 100644 --- a/pkgs/core/supabase/tests/fail_task_when_exhausted/type_violation_always_hard_fails.test.sql +++ b/pkgs/core/supabase/tests/fail_task_when_exhausted/type_violation_always_hard_fails.test.sql @@ -46,11 +46,11 @@ select is( 'Run should be failed on TYPE_VIOLATION regardless of when_exhausted setting' ); --- TEST 4: step_b should NOT be skipped (run failed before cascade could happen) -select isnt( +-- TEST 4: step_b should be in 'created' state (run failed before cascade could happen) +select is( (select status from pgflow.step_states where flow_slug = 'test_flow' and step_slug = 'step_b'), - 'skipped', - 'step_b should not be skipped - run failed before any cascade' + 'created', + 'step_b should be created - run failed before any cascade' ); select finish(); diff --git a/pkgs/core/supabase/tests/fail_task_when_exhausted/when_exhausted_skip_cascade_skips_dependents.test.sql b/pkgs/core/supabase/tests/fail_task_when_exhausted/when_exhausted_skip_cascade_skips_dependents.test.sql index 8e6ecadbe..73fdeffc1 100644 --- a/pkgs/core/supabase/tests/fail_task_when_exhausted/when_exhausted_skip_cascade_skips_dependents.test.sql +++ b/pkgs/core/supabase/tests/fail_task_when_exhausted/when_exhausted_skip_cascade_skips_dependents.test.sql @@ -13,7 +13,21 @@ select pgflow.add_step('test_flow', 'step_d'); -- Independent step to verify ru -- Start flow and fail step_a's task select pgflow.start_flow('test_flow', '{}'::jsonb); -select pgflow_tests.poll_and_fail('test_flow'); +with started as ( + select * from pgflow_tests.read_and_start('test_flow', qty => 10) +), +target as ( + select run_id, step_slug, task_index + from started + where step_slug = 'step_a' + limit 1 +) +select pgflow.fail_task( + (select run_id from target), + (select step_slug from target), + (select task_index from target), + (select step_slug from target) || ' FAILED' +); -- TEST 1: step_a should be skipped (not failed) select is( diff --git a/pkgs/core/supabase/tests/fail_task_when_exhausted/when_exhausted_skip_skips_step.test.sql b/pkgs/core/supabase/tests/fail_task_when_exhausted/when_exhausted_skip_skips_step.test.sql index d4f686c74..400c7d9c1 100644 --- a/pkgs/core/supabase/tests/fail_task_when_exhausted/when_exhausted_skip_skips_step.test.sql +++ b/pkgs/core/supabase/tests/fail_task_when_exhausted/when_exhausted_skip_skips_step.test.sql @@ -10,7 +10,21 @@ select pgflow.add_step('test_flow', 'step_b'); -- Independent step to verify ru -- Start flow and fail step_a's task select pgflow.start_flow('test_flow', '{}'::jsonb); -select pgflow_tests.poll_and_fail('test_flow'); +with started as ( + select * from pgflow_tests.read_and_start('test_flow', qty => 10) +), +target as ( + select run_id, step_slug, task_index + from started + where step_slug = 'step_a' + limit 1 +) +select pgflow.fail_task( + (select run_id from target), + (select step_slug from target), + (select task_index from target), + (select step_slug from target) || ' FAILED' +); -- TEST 1: Task should be failed (it still failed, but skip mode affects step/run) select is( diff --git a/pkgs/core/supabase/tests/start_tasks/does_not_start_tasks_for_skipped_step.test.sql b/pkgs/core/supabase/tests/start_tasks/does_not_start_tasks_for_skipped_step.test.sql new file mode 100644 index 000000000..cc764736d --- /dev/null +++ b/pkgs/core/supabase/tests/start_tasks/does_not_start_tasks_for_skipped_step.test.sql @@ -0,0 +1,76 @@ +begin; + +select plan(5); +select pgflow_tests.reset_db(); + +-- Test: start_tasks must reject queued tasks from skipped steps +-- Setup: map_a root map with max_attempts=0, when_exhausted='skip' +-- other independent root step (keeps run started) +-- Scenario: Fail map_a[0] -> map_a becomes skipped +-- Try to start queued map_a[1] +-- Expected: start_tasks returns 0 rows for map_a[1] + +-- Create flow with two independent root steps +select pgflow.create_flow('skip_start_guard'); +select pgflow.add_step('skip_start_guard', 'map_a', '{}', max_attempts=>0, step_type=>'map', when_exhausted=>'skip'); +select pgflow.add_step('skip_start_guard', 'other', '{}'); + +-- Start flow with array [1,2] so map_a has 2 tasks +select pgflow.start_flow('skip_start_guard', '[1,2]'::jsonb); + +-- Start only map_a[0] by selecting its message_id and passing to start_tasks +with task0 as ( + select message_id + from pgflow.step_tasks + where flow_slug = 'skip_start_guard' and step_slug = 'map_a' and task_index = 0 +) +select is( + (select count(*) from pgflow.start_tasks('skip_start_guard', array[(select message_id from task0)::bigint], pgflow_tests.ensure_worker('skip_start_guard'))), + 1::bigint, + 'Should start 1 task for map_a[0]' +); + +-- Verify map_a[0] is started +select is( + (select status from pgflow.step_tasks where flow_slug = 'skip_start_guard' and step_slug = 'map_a' and task_index = 0), + 'started', + 'map_a[0] should be started' +); + +-- Verify map_a[1] is still queued +select is( + (select status from pgflow.step_tasks where flow_slug = 'skip_start_guard' and step_slug = 'map_a' and task_index = 1), + 'queued', + 'map_a[1] should be queued' +); + +-- Fail map_a[0] to trigger skip (max_attempts=0, when_exhausted='skip') +select pgflow.fail_task( + (select run_id from pgflow.runs where flow_slug = 'skip_start_guard'), + 'map_a', + 0, + 'handler_failed' +); + +-- Verify map_a step is now skipped +select is( + (select status from pgflow.step_states where flow_slug = 'skip_start_guard' and step_slug = 'map_a'), + 'skipped', + 'map_a step should be skipped after task 0 fails' +); + +-- Attempt to start queued sibling map_a[1] using its message_id +-- Expected: start_tasks returns 0 rows (blocked by step state guard) +with task1 as ( + select message_id + from pgflow.step_tasks + where flow_slug = 'skip_start_guard' and step_slug = 'map_a' and task_index = 1 +) +select is( + (select count(*) from pgflow.start_tasks('skip_start_guard', array[(select message_id from task1)::bigint], pgflow_tests.ensure_worker('skip_start_guard'))), + 0::bigint, + 'Should NOT start task for map_a[1] when step is skipped' +); + +select * from finish(); +rollback;