From 30944924299e95877cbfd4c08b304eafa8ef559a Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Fri, 6 Feb 2026 22:17:11 +0100 Subject: [PATCH] cover more edge cases --- ...00_function_cascade_resolve_conditions.sql | 37 ++++++ .../0100_function_create_flow_from_shape.sql | 4 +- pkgs/core/schemas/0100_function_fail_task.sql | 71 +++++----- .../20260206115746_pgflow_step_conditions.sql | 112 ++++++++++------ pkgs/core/supabase/migrations/atlas.sum | 4 +- ...met_fail_archives_active_messages.test.sql | 79 ++++++++++++ ...t_unmet_fail_emits_failure_events.test.sql | 88 +++++++++++++ .../missing_condition_modes_defaults.test.sql | 67 ++++++++++ ...ayed_fail_task_skip_is_idempotent.test.sql | 121 ++++++++++++++++++ ...en_taskless_map_starts_downstream.test.sql | 101 +++++++++++++++ 10 files changed, 604 insertions(+), 80 deletions(-) create mode 100644 pkgs/core/supabase/tests/condition_evaluation/dependent_unmet_fail_archives_active_messages.test.sql create mode 100644 pkgs/core/supabase/tests/condition_evaluation/root_unmet_fail_emits_failure_events.test.sql create mode 100644 pkgs/core/supabase/tests/create_flow_from_shape/missing_condition_modes_defaults.test.sql create mode 100644 pkgs/core/supabase/tests/fail_task_when_exhausted/replayed_fail_task_skip_is_idempotent.test.sql create mode 100644 pkgs/core/supabase/tests/fail_task_when_exhausted/skip_then_taskless_map_starts_downstream.test.sql diff --git a/pkgs/core/schemas/0100_function_cascade_resolve_conditions.sql b/pkgs/core/schemas/0100_function_cascade_resolve_conditions.sql index 2a86d4861..8a953f179 100644 --- a/pkgs/core/schemas/0100_function_cascade_resolve_conditions.sql +++ b/pkgs/core/schemas/0100_function_cascade_resolve_conditions.sql @@ -116,6 +116,43 @@ BEGIN failed_at = now() WHERE pgflow.runs.run_id = cascade_resolve_conditions.run_id; + PERFORM realtime.send( + jsonb_build_object( + 'event_type', 'step:failed', + 'run_id', cascade_resolve_conditions.run_id, + 'step_slug', v_first_fail.step_slug, + 'status', 'failed', + 'error_message', 'Condition not met', + 'failed_at', now() + ), + concat('step:', v_first_fail.step_slug, ':failed'), + concat('pgflow:run:', cascade_resolve_conditions.run_id), + false + ); + + PERFORM realtime.send( + jsonb_build_object( + 'event_type', 'run:failed', + 'run_id', cascade_resolve_conditions.run_id, + 'flow_slug', v_first_fail.flow_slug, + 'status', 'failed', + 'error_message', 'Condition not met', + 'failed_at', now() + ), + 'run:failed', + concat('pgflow:run:', cascade_resolve_conditions.run_id), + false + ); + + PERFORM pgmq.archive(r.flow_slug, ARRAY_AGG(st.message_id)) + FROM pgflow.step_tasks st + JOIN pgflow.runs r ON st.run_id = r.run_id + WHERE st.run_id = cascade_resolve_conditions.run_id + AND st.status IN ('queued', 'started') + AND st.message_id IS NOT NULL + GROUP BY r.flow_slug + HAVING COUNT(st.message_id) > 0; + RETURN false; END IF; diff --git a/pkgs/core/schemas/0100_function_create_flow_from_shape.sql b/pkgs/core/schemas/0100_function_create_flow_from_shape.sql index c9501921f..a6006432d 100644 --- a/pkgs/core/schemas/0100_function_create_flow_from_shape.sql +++ b/pkgs/core/schemas/0100_function_create_flow_from_shape.sql @@ -48,8 +48,8 @@ BEGIN timeout => (v_step_options->>'timeout')::int, start_delay => (v_step_options->>'startDelay')::int, step_type => v_step->>'stepType', - when_unmet => v_step->>'whenUnmet', - when_exhausted => v_step->>'whenExhausted', + when_unmet => COALESCE(v_step->>'whenUnmet', 'skip'), + when_exhausted => COALESCE(v_step->>'whenExhausted', 'fail'), required_input_pattern => CASE WHEN (v_step->'requiredInputPattern'->>'defined')::boolean THEN v_step->'requiredInputPattern'->'value' diff --git a/pkgs/core/schemas/0100_function_fail_task.sql b/pkgs/core/schemas/0100_function_fail_task.sql index e926dbd18..08b0df90b 100644 --- a/pkgs/core/schemas/0100_function_fail_task.sql +++ b/pkgs/core/schemas/0100_function_fail_task.sql @@ -140,45 +140,42 @@ maybe_fail_step AS ( WHERE pgflow.step_states.run_id = fail_task.run_id AND pgflow.step_states.step_slug = fail_task.step_slug RETURNING pgflow.step_states.* +), +run_update AS ( + -- Update run status: only fail when when_exhausted='fail' and step was failed + UPDATE pgflow.runs + SET status = CASE + WHEN (select status from maybe_fail_step) = 'failed' THEN 'failed' + ELSE status + END, + failed_at = CASE + WHEN (select status from maybe_fail_step) = 'failed' THEN now() + ELSE NULL + END, + -- Decrement remaining_steps when step was skipped (not failed, run continues) + remaining_steps = CASE + WHEN (select status from maybe_fail_step) = 'skipped' THEN pgflow.runs.remaining_steps - 1 + ELSE pgflow.runs.remaining_steps + END + WHERE pgflow.runs.run_id = fail_task.run_id + RETURNING pgflow.runs.status ) - -- Update run status: only fail when when_exhausted='fail' and step was failed -UPDATE pgflow.runs -SET status = CASE - WHEN (select status from maybe_fail_step) = 'failed' THEN 'failed' - ELSE status - END, - failed_at = CASE - WHEN (select status from maybe_fail_step) = 'failed' THEN now() - ELSE NULL - END, - -- Decrement remaining_steps when step was skipped (not failed, run continues) - remaining_steps = CASE - WHEN (select status from maybe_fail_step) = 'skipped' THEN pgflow.runs.remaining_steps - 1 - ELSE pgflow.runs.remaining_steps - END -WHERE pgflow.runs.run_id = fail_task.run_id -RETURNING (status = 'failed') INTO v_run_failed; +SELECT + COALESCE((SELECT status = 'failed' FROM run_update), false), + COALESCE((SELECT status = 'failed' FROM maybe_fail_step), false), + COALESCE((SELECT status = 'skipped' FROM maybe_fail_step), false), + COALESCE((SELECT is_exhausted FROM task_status), false) +INTO v_run_failed, v_step_failed, v_step_skipped, v_task_exhausted; - -- Capture when_exhausted mode and check if step was skipped for later processing + -- Capture when_exhausted mode for later skip handling SELECT s.when_exhausted INTO v_when_exhausted FROM pgflow.steps s JOIN pgflow.runs r ON r.flow_slug = s.flow_slug -WHERE r.run_id = fail_task.run_id - AND s.step_slug = fail_task.step_slug; - -SELECT (status = 'skipped') INTO v_step_skipped -FROM pgflow.step_states -WHERE pgflow.step_states.run_id = fail_task.run_id - AND pgflow.step_states.step_slug = fail_task.step_slug; - --- Check if step failed by querying the step_states table -SELECT (status = 'failed') INTO v_step_failed -FROM pgflow.step_states -WHERE pgflow.step_states.run_id = fail_task.run_id - AND pgflow.step_states.step_slug = fail_task.step_slug; + WHERE r.run_id = fail_task.run_id + AND s.step_slug = fail_task.step_slug; -- Send broadcast event for step failure if the step was failed -IF v_step_failed THEN +IF v_task_exhausted AND v_step_failed THEN PERFORM realtime.send( jsonb_build_object( 'event_type', 'step:failed', @@ -194,8 +191,8 @@ IF v_step_failed THEN ); END IF; - -- Handle step skipping (when_exhausted = 'skip' or 'skip-cascade') - IF v_step_skipped THEN +-- Handle step skipping (when_exhausted = 'skip' or 'skip-cascade') + IF v_task_exhausted AND v_step_skipped THEN -- Send broadcast event for step skipped PERFORM realtime.send( jsonb_build_object( @@ -237,11 +234,11 @@ END IF; AND dep.dep_slug = fail_task.step_slug AND child_state.step_slug = dep.step_slug; - -- Start any steps that became ready after decrementing remaining_deps - PERFORM pgflow.start_ready_steps(fail_task.run_id); - -- Auto-complete taskless steps (e.g., map steps with initial_tasks=0 from skipped dep) PERFORM pgflow.cascade_complete_taskless_steps(fail_task.run_id); + + -- Start steps that became ready after taskless completion propagation + PERFORM pgflow.start_ready_steps(fail_task.run_id); END IF; -- Try to complete the run (remaining_steps may now be 0) diff --git a/pkgs/core/supabase/migrations/20260206115746_pgflow_step_conditions.sql b/pkgs/core/supabase/migrations/20260206115746_pgflow_step_conditions.sql index 7550a3c86..9353e5730 100644 --- a/pkgs/core/supabase/migrations/20260206115746_pgflow_step_conditions.sql +++ b/pkgs/core/supabase/migrations/20260206115746_pgflow_step_conditions.sql @@ -273,8 +273,8 @@ BEGIN timeout => (v_step_options->>'timeout')::int, start_delay => (v_step_options->>'startDelay')::int, step_type => v_step->>'stepType', - when_unmet => v_step->>'whenUnmet', - when_exhausted => v_step->>'whenExhausted', + when_unmet => COALESCE(v_step->>'whenUnmet', 'skip'), + when_exhausted => COALESCE(v_step->>'whenExhausted', 'fail'), required_input_pattern => CASE WHEN (v_step->'requiredInputPattern'->>'defined')::boolean THEN v_step->'requiredInputPattern'->'value' @@ -533,6 +533,43 @@ BEGIN failed_at = now() WHERE pgflow.runs.run_id = cascade_resolve_conditions.run_id; + PERFORM realtime.send( + jsonb_build_object( + 'event_type', 'step:failed', + 'run_id', cascade_resolve_conditions.run_id, + 'step_slug', v_first_fail.step_slug, + 'status', 'failed', + 'error_message', 'Condition not met', + 'failed_at', now() + ), + concat('step:', v_first_fail.step_slug, ':failed'), + concat('pgflow:run:', cascade_resolve_conditions.run_id), + false + ); + + PERFORM realtime.send( + jsonb_build_object( + 'event_type', 'run:failed', + 'run_id', cascade_resolve_conditions.run_id, + 'flow_slug', v_first_fail.flow_slug, + 'status', 'failed', + 'error_message', 'Condition not met', + 'failed_at', now() + ), + 'run:failed', + concat('pgflow:run:', cascade_resolve_conditions.run_id), + false + ); + + PERFORM pgmq.archive(r.flow_slug, ARRAY_AGG(st.message_id)) + FROM pgflow.step_tasks st + JOIN pgflow.runs r ON st.run_id = r.run_id + WHERE st.run_id = cascade_resolve_conditions.run_id + AND st.status IN ('queued', 'started') + AND st.message_id IS NOT NULL + GROUP BY r.flow_slug + HAVING COUNT(st.message_id) > 0; + RETURN false; END IF; @@ -1315,45 +1352,42 @@ maybe_fail_step AS ( WHERE pgflow.step_states.run_id = fail_task.run_id AND pgflow.step_states.step_slug = fail_task.step_slug RETURNING pgflow.step_states.* +), +run_update AS ( + -- Update run status: only fail when when_exhausted='fail' and step was failed + UPDATE pgflow.runs + SET status = CASE + WHEN (select status from maybe_fail_step) = 'failed' THEN 'failed' + ELSE status + END, + failed_at = CASE + WHEN (select status from maybe_fail_step) = 'failed' THEN now() + ELSE NULL + END, + -- Decrement remaining_steps when step was skipped (not failed, run continues) + remaining_steps = CASE + WHEN (select status from maybe_fail_step) = 'skipped' THEN pgflow.runs.remaining_steps - 1 + ELSE pgflow.runs.remaining_steps + END + WHERE pgflow.runs.run_id = fail_task.run_id + RETURNING pgflow.runs.status ) - -- Update run status: only fail when when_exhausted='fail' and step was failed -UPDATE pgflow.runs -SET status = CASE - WHEN (select status from maybe_fail_step) = 'failed' THEN 'failed' - ELSE status - END, - failed_at = CASE - WHEN (select status from maybe_fail_step) = 'failed' THEN now() - ELSE NULL - END, - -- Decrement remaining_steps when step was skipped (not failed, run continues) - remaining_steps = CASE - WHEN (select status from maybe_fail_step) = 'skipped' THEN pgflow.runs.remaining_steps - 1 - ELSE pgflow.runs.remaining_steps - END -WHERE pgflow.runs.run_id = fail_task.run_id -RETURNING (status = 'failed') INTO v_run_failed; +SELECT + COALESCE((SELECT status = 'failed' FROM run_update), false), + COALESCE((SELECT status = 'failed' FROM maybe_fail_step), false), + COALESCE((SELECT status = 'skipped' FROM maybe_fail_step), false), + COALESCE((SELECT is_exhausted FROM task_status), false) +INTO v_run_failed, v_step_failed, v_step_skipped, v_task_exhausted; - -- Capture when_exhausted mode and check if step was skipped for later processing + -- Capture when_exhausted mode for later skip handling SELECT s.when_exhausted INTO v_when_exhausted FROM pgflow.steps s JOIN pgflow.runs r ON r.flow_slug = s.flow_slug -WHERE r.run_id = fail_task.run_id - AND s.step_slug = fail_task.step_slug; - -SELECT (status = 'skipped') INTO v_step_skipped -FROM pgflow.step_states -WHERE pgflow.step_states.run_id = fail_task.run_id - AND pgflow.step_states.step_slug = fail_task.step_slug; - --- Check if step failed by querying the step_states table -SELECT (status = 'failed') INTO v_step_failed -FROM pgflow.step_states -WHERE pgflow.step_states.run_id = fail_task.run_id - AND pgflow.step_states.step_slug = fail_task.step_slug; + WHERE r.run_id = fail_task.run_id + AND s.step_slug = fail_task.step_slug; -- Send broadcast event for step failure if the step was failed -IF v_step_failed THEN +IF v_task_exhausted AND v_step_failed THEN PERFORM realtime.send( jsonb_build_object( 'event_type', 'step:failed', @@ -1369,8 +1403,8 @@ IF v_step_failed THEN ); END IF; - -- Handle step skipping (when_exhausted = 'skip' or 'skip-cascade') - IF v_step_skipped THEN +-- Handle step skipping (when_exhausted = 'skip' or 'skip-cascade') + IF v_task_exhausted AND v_step_skipped THEN -- Send broadcast event for step skipped PERFORM realtime.send( jsonb_build_object( @@ -1412,11 +1446,11 @@ END IF; AND dep.dep_slug = fail_task.step_slug AND child_state.step_slug = dep.step_slug; - -- Start any steps that became ready after decrementing remaining_deps - PERFORM pgflow.start_ready_steps(fail_task.run_id); - -- Auto-complete taskless steps (e.g., map steps with initial_tasks=0 from skipped dep) PERFORM pgflow.cascade_complete_taskless_steps(fail_task.run_id); + + -- Start steps that became ready after taskless completion propagation + PERFORM pgflow.start_ready_steps(fail_task.run_id); END IF; -- Try to complete the run (remaining_steps may now be 0) diff --git a/pkgs/core/supabase/migrations/atlas.sum b/pkgs/core/supabase/migrations/atlas.sum index c98885fa9..87b7f7f22 100644 --- a/pkgs/core/supabase/migrations/atlas.sum +++ b/pkgs/core/supabase/migrations/atlas.sum @@ -1,4 +1,4 @@ -h1:ThrUAu9izqXh7CYZpi1VC17rNHGXdQh4yX5fwrTmygU= +h1:dzDaDNhlNGmZ64MoAwWQgOSAWXwmFPwjUWX6AFIKQ/s= 20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s= 20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY= 20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg= @@ -18,4 +18,4 @@ h1:ThrUAu9izqXh7CYZpi1VC17rNHGXdQh4yX5fwrTmygU= 20260103145141_pgflow_step_output_storage.sql h1:mgVHSFDLdtYy//SZ6C03j9Str1iS9xCM8Rz/wyFwn3o= 20260120205547_pgflow_requeue_stalled_tasks.sql h1:4wCBBvjtETCgJf1eXmlH5wCTKDUhiLi0uzsFG1V528E= 20260124113408_pgflow_auth_secret_support.sql h1:i/s1JkBqRElN6FOYFQviJt685W08SuSo30aP25lNlLc= -20260206115746_pgflow_step_conditions.sql h1:rIoXVl0SoVFGHdCFpAQnD6DRSHugzQODZa+UjAhA0ow= +20260206115746_pgflow_step_conditions.sql h1:gxuxa1iDgOSA/yCO446tUIVnhrAvwVdLR1w334yGzMk= diff --git a/pkgs/core/supabase/tests/condition_evaluation/dependent_unmet_fail_archives_active_messages.test.sql b/pkgs/core/supabase/tests/condition_evaluation/dependent_unmet_fail_archives_active_messages.test.sql new file mode 100644 index 000000000..b43dcf413 --- /dev/null +++ b/pkgs/core/supabase/tests/condition_evaluation/dependent_unmet_fail_archives_active_messages.test.sql @@ -0,0 +1,79 @@ +begin; +select plan(4); + +select pgflow_tests.reset_db(); + +select pgflow.create_flow('dependent_fail_archive'); +select pgflow.add_step( + flow_slug => 'dependent_fail_archive', + step_slug => 'first' +); +select pgflow.add_step( + flow_slug => 'dependent_fail_archive', + step_slug => 'second' +); +select pgflow.add_step( + flow_slug => 'dependent_fail_archive', + step_slug => 'checker', + deps_slugs => array['first'], + required_input_pattern => '{"ok": true}'::jsonb, + when_unmet => 'fail' +); + +with run as ( + select * + from pgflow.start_flow('dependent_fail_archive', '{}'::jsonb) +) +select run_id into temporary run_ids from run; + +select pgflow_tests.read_and_start('dependent_fail_archive'); + +select pgflow.complete_task( + run_id => (select run_id from run_ids), + step_slug => 'first', + task_index => 0, + output => '{"ok": false}'::jsonb +); + +select is( + ( + select status + from pgflow.runs + where run_id = (select run_id from run_ids) + ), + 'failed', + 'run should fail when dependent fail-condition is unmet' +); + +select is( + ( + select status + from pgflow.step_states + where run_id = (select run_id from run_ids) + and step_slug = 'checker' + ), + 'failed', + 'checker should fail due to unmet condition' +); + +select is( + ( + select count(*) + from pgmq.q_dependent_fail_archive + ), + 0::bigint, + 'run failure should archive all active queue messages' +); + +select ok( + ( + select count(*) + from pgmq.a_dependent_fail_archive + ) >= 2, + 'archive queue should contain completed and run-failure archived messages' +); + +drop table if exists run_ids; + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/condition_evaluation/root_unmet_fail_emits_failure_events.test.sql b/pkgs/core/supabase/tests/condition_evaluation/root_unmet_fail_emits_failure_events.test.sql new file mode 100644 index 000000000..cde264bbf --- /dev/null +++ b/pkgs/core/supabase/tests/condition_evaluation/root_unmet_fail_emits_failure_events.test.sql @@ -0,0 +1,88 @@ +begin; +select plan(6); + +select pgflow_tests.reset_db(); + +select pgflow.create_flow('root_fail_events'); +select pgflow.add_step( + flow_slug => 'root_fail_events', + step_slug => 'guarded', + required_input_pattern => '{"ok": true}'::jsonb, + when_unmet => 'fail' +); + +with run as ( + select * + from pgflow.start_flow('root_fail_events', '{}'::jsonb) +) +select run_id into temporary run_ids from run; + +select is( + ( + select status + from pgflow.step_states + where run_id = (select run_id from run_ids) + and step_slug = 'guarded' + ), + 'failed', + 'guarded step should fail when root condition is unmet' +); + +select is( + ( + select status + from pgflow.runs + where run_id = (select run_id from run_ids) + ), + 'failed', + 'run should fail when root fail-condition is unmet' +); + +select is( + pgflow_tests.count_realtime_events( + event_type => 'step:failed', + run_id => (select run_id from run_ids), + step_slug => 'guarded' + ), + 1::integer, + 'should emit one step:failed event' +); + +select is( + pgflow_tests.count_realtime_events( + event_type => 'run:failed', + run_id => (select run_id from run_ids) + ), + 1::integer, + 'should emit one run:failed event' +); + +select is( + ( + select payload->>'status' + from pgflow_tests.get_realtime_message( + event_type => 'step:failed', + run_id => (select run_id from run_ids), + step_slug => 'guarded' + ) + ), + 'failed', + 'step:failed payload should include failed status' +); + +select is( + ( + select payload->>'status' + from pgflow_tests.get_realtime_message( + event_type => 'run:failed', + run_id => (select run_id from run_ids) + ) + ), + 'failed', + 'run:failed payload should include failed status' +); + +drop table if exists run_ids; + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/create_flow_from_shape/missing_condition_modes_defaults.test.sql b/pkgs/core/supabase/tests/create_flow_from_shape/missing_condition_modes_defaults.test.sql new file mode 100644 index 000000000..59f997bf6 --- /dev/null +++ b/pkgs/core/supabase/tests/create_flow_from_shape/missing_condition_modes_defaults.test.sql @@ -0,0 +1,67 @@ +begin; +select plan(5); + +select pgflow_tests.reset_db(); + +select lives_ok( + $$ + select pgflow._create_flow_from_shape( + p_flow_slug => 'legacy_shape_defaults', + p_shape => '{ + "steps": [ + { + "slug": "first", + "stepType": "single", + "dependencies": [] + } + ] + }'::jsonb + ) + $$, + 'legacy shape without condition mode fields should compile' +); + +select is( + ( + select when_unmet + from pgflow.steps + where flow_slug = 'legacy_shape_defaults' + and step_slug = 'first' + ), + 'skip', + 'missing whenUnmet should default to skip' +); + +select is( + ( + select when_exhausted + from pgflow.steps + where flow_slug = 'legacy_shape_defaults' + and step_slug = 'first' + ), + 'fail', + 'missing whenExhausted should default to fail' +); + +select ok( + ( + select required_input_pattern is null + from pgflow.steps + where flow_slug = 'legacy_shape_defaults' + and step_slug = 'first' + ), + 'required_input_pattern should remain null when omitted' +); + +select ok( + ( + select forbidden_input_pattern is null + from pgflow.steps + where flow_slug = 'legacy_shape_defaults' + and step_slug = 'first' + ), + 'forbidden_input_pattern should remain null when omitted' +); + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/fail_task_when_exhausted/replayed_fail_task_skip_is_idempotent.test.sql b/pkgs/core/supabase/tests/fail_task_when_exhausted/replayed_fail_task_skip_is_idempotent.test.sql new file mode 100644 index 000000000..729b37040 --- /dev/null +++ b/pkgs/core/supabase/tests/fail_task_when_exhausted/replayed_fail_task_skip_is_idempotent.test.sql @@ -0,0 +1,121 @@ +begin; +select plan(7); + +select pgflow_tests.reset_db(); + +select pgflow.create_flow('replayed_skip_idempotent'); +select pgflow.add_step( + flow_slug => 'replayed_skip_idempotent', + step_slug => 'a', + max_attempts => 0, + when_exhausted => 'skip' +); +select pgflow.add_step( + flow_slug => 'replayed_skip_idempotent', + step_slug => 'd' +); +select pgflow.add_step( + flow_slug => 'replayed_skip_idempotent', + step_slug => 'c', + deps_slugs => array['d'] +); +select pgflow.add_step( + flow_slug => 'replayed_skip_idempotent', + step_slug => 'b', + deps_slugs => array['a', 'c'] +); + +with run as ( + select * + from pgflow.start_flow('replayed_skip_idempotent', '{}'::jsonb) +) +select run_id into temporary run_ids from run; + +select pgflow_tests.poll_and_fail('replayed_skip_idempotent'); + +select is( + ( + select status + from pgflow.step_states + where run_id = (select run_id from run_ids) + and step_slug = 'a' + ), + 'skipped', + 'first fail should skip step a' +); + +select is( + ( + select remaining_deps + from pgflow.step_states + where run_id = (select run_id from run_ids) + and step_slug = 'b' + ), + 1, + 'first fail should decrement b remaining deps from 2 to 1' +); + +select is( + ( + select status + from pgflow.step_states + where run_id = (select run_id from run_ids) + and step_slug = 'b' + ), + 'created', + 'b should still be waiting on c after first fail' +); + +select pgflow.fail_task( + run_id => (select run_id from run_ids), + step_slug => 'a', + task_index => 0, + error_message => 'replayed failure' +); + +select is( + ( + select remaining_deps + from pgflow.step_states + where run_id = (select run_id from run_ids) + and step_slug = 'b' + ), + 1, + 'replayed fail_task should not decrement b remaining deps again' +); + +select is( + ( + select status + from pgflow.step_states + where run_id = (select run_id from run_ids) + and step_slug = 'b' + ), + 'created', + 'replayed fail_task should not start b prematurely' +); + +select is( + pgflow_tests.count_realtime_events( + event_type => 'step:skipped', + run_id => (select run_id from run_ids), + step_slug => 'a' + ), + 1::integer, + 'replayed fail_task should not emit duplicate step:skipped event' +); + +select is( + ( + select status + from pgflow.runs + where run_id = (select run_id from run_ids) + ), + 'started', + 'run should remain started while c is incomplete' +); + +drop table if exists run_ids; + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/fail_task_when_exhausted/skip_then_taskless_map_starts_downstream.test.sql b/pkgs/core/supabase/tests/fail_task_when_exhausted/skip_then_taskless_map_starts_downstream.test.sql new file mode 100644 index 000000000..12df9f679 --- /dev/null +++ b/pkgs/core/supabase/tests/fail_task_when_exhausted/skip_then_taskless_map_starts_downstream.test.sql @@ -0,0 +1,101 @@ +begin; +select plan(6); + +select pgflow_tests.reset_db(); + +select pgflow.create_flow('skip_map_downstream'); +select pgflow.add_step( + flow_slug => 'skip_map_downstream', + step_slug => 'a', + max_attempts => 0, + when_exhausted => 'skip' +); +select pgflow.add_step( + flow_slug => 'skip_map_downstream', + step_slug => 'm', + step_type => 'map', + deps_slugs => array['a'] +); +select pgflow.add_step( + flow_slug => 'skip_map_downstream', + step_slug => 'z', + deps_slugs => array['m'] +); + +with run as ( + select * + from pgflow.start_flow('skip_map_downstream', '{}'::jsonb) +) +select run_id into temporary run_ids from run; + +select pgflow_tests.poll_and_fail('skip_map_downstream'); + +select is( + ( + select status + from pgflow.step_states + where run_id = (select run_id from run_ids) + and step_slug = 'a' + ), + 'skipped', + 'source step should be skipped after exhaustion' +); + +select is( + ( + select status + from pgflow.step_states + where run_id = (select run_id from run_ids) + and step_slug = 'm' + ), + 'completed', + 'taskless map should auto-complete after dependency skip' +); + +select is( + ( + select output + from pgflow.step_states + where run_id = (select run_id from run_ids) + and step_slug = 'm' + ), + '[]'::jsonb, + 'auto-completed map should emit empty array output' +); + +select is( + ( + select remaining_deps + from pgflow.step_states + where run_id = (select run_id from run_ids) + and step_slug = 'z' + ), + 0, + 'downstream step should have all dependencies resolved' +); + +select is( + ( + select status + from pgflow.step_states + where run_id = (select run_id from run_ids) + and step_slug = 'z' + ), + 'started', + 'downstream step should be started after map auto-completion' +); + +select is( + ( + select status + from pgflow.runs + where run_id = (select run_id from run_ids) + ), + 'started', + 'run should remain started while downstream work is in progress' +); + +drop table if exists run_ids; + +select finish(); +rollback;