Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions pkgs/core/schemas/0100_function_cascade_resolve_conditions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,43 @@ BEGIN
failed_at = now()
WHERE pgflow.runs.run_id = cascade_resolve_conditions.run_id;

PERFORM realtime.send(
jsonb_build_object(
'event_type', 'step:failed',
'run_id', cascade_resolve_conditions.run_id,
'step_slug', v_first_fail.step_slug,
'status', 'failed',
'error_message', 'Condition not met',
'failed_at', now()
),
concat('step:', v_first_fail.step_slug, ':failed'),
concat('pgflow:run:', cascade_resolve_conditions.run_id),
false
);

PERFORM realtime.send(
jsonb_build_object(
'event_type', 'run:failed',
'run_id', cascade_resolve_conditions.run_id,
'flow_slug', v_first_fail.flow_slug,
'status', 'failed',
'error_message', 'Condition not met',
'failed_at', now()
),
'run:failed',
concat('pgflow:run:', cascade_resolve_conditions.run_id),
false
);

PERFORM pgmq.archive(r.flow_slug, ARRAY_AGG(st.message_id))
FROM pgflow.step_tasks st
JOIN pgflow.runs r ON st.run_id = r.run_id
WHERE st.run_id = cascade_resolve_conditions.run_id
AND st.status IN ('queued', 'started')
AND st.message_id IS NOT NULL
GROUP BY r.flow_slug
HAVING COUNT(st.message_id) > 0;

RETURN false;
END IF;

Expand Down
4 changes: 2 additions & 2 deletions pkgs/core/schemas/0100_function_create_flow_from_shape.sql
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ BEGIN
timeout => (v_step_options->>'timeout')::int,
start_delay => (v_step_options->>'startDelay')::int,
step_type => v_step->>'stepType',
when_unmet => v_step->>'whenUnmet',
when_exhausted => v_step->>'whenExhausted',
when_unmet => COALESCE(v_step->>'whenUnmet', 'skip'),
when_exhausted => COALESCE(v_step->>'whenExhausted', 'fail'),
required_input_pattern => CASE
WHEN (v_step->'requiredInputPattern'->>'defined')::boolean
THEN v_step->'requiredInputPattern'->'value'
Expand Down
71 changes: 34 additions & 37 deletions pkgs/core/schemas/0100_function_fail_task.sql
Original file line number Diff line number Diff line change
Expand Up @@ -140,45 +140,42 @@ maybe_fail_step AS (
WHERE pgflow.step_states.run_id = fail_task.run_id
AND pgflow.step_states.step_slug = fail_task.step_slug
RETURNING pgflow.step_states.*
),
run_update AS (
-- Update run status: only fail when when_exhausted='fail' and step was failed
UPDATE pgflow.runs
SET status = CASE
WHEN (select status from maybe_fail_step) = 'failed' THEN 'failed'
ELSE status
END,
failed_at = CASE
WHEN (select status from maybe_fail_step) = 'failed' THEN now()
ELSE NULL
END,
-- Decrement remaining_steps when step was skipped (not failed, run continues)
remaining_steps = CASE
WHEN (select status from maybe_fail_step) = 'skipped' THEN pgflow.runs.remaining_steps - 1
ELSE pgflow.runs.remaining_steps
END
WHERE pgflow.runs.run_id = fail_task.run_id
RETURNING pgflow.runs.status
)
-- Update run status: only fail when when_exhausted='fail' and step was failed
UPDATE pgflow.runs
SET status = CASE
WHEN (select status from maybe_fail_step) = 'failed' THEN 'failed'
ELSE status
END,
failed_at = CASE
WHEN (select status from maybe_fail_step) = 'failed' THEN now()
ELSE NULL
END,
-- Decrement remaining_steps when step was skipped (not failed, run continues)
remaining_steps = CASE
WHEN (select status from maybe_fail_step) = 'skipped' THEN pgflow.runs.remaining_steps - 1
ELSE pgflow.runs.remaining_steps
END
WHERE pgflow.runs.run_id = fail_task.run_id
RETURNING (status = 'failed') INTO v_run_failed;
SELECT
COALESCE((SELECT status = 'failed' FROM run_update), false),
COALESCE((SELECT status = 'failed' FROM maybe_fail_step), false),
COALESCE((SELECT status = 'skipped' FROM maybe_fail_step), false),
COALESCE((SELECT is_exhausted FROM task_status), false)
INTO v_run_failed, v_step_failed, v_step_skipped, v_task_exhausted;

-- Capture when_exhausted mode and check if step was skipped for later processing
-- Capture when_exhausted mode for later skip handling
SELECT s.when_exhausted INTO v_when_exhausted
FROM pgflow.steps s
JOIN pgflow.runs r ON r.flow_slug = s.flow_slug
WHERE r.run_id = fail_task.run_id
AND s.step_slug = fail_task.step_slug;

SELECT (status = 'skipped') INTO v_step_skipped
FROM pgflow.step_states
WHERE pgflow.step_states.run_id = fail_task.run_id
AND pgflow.step_states.step_slug = fail_task.step_slug;

-- Check if step failed by querying the step_states table
SELECT (status = 'failed') INTO v_step_failed
FROM pgflow.step_states
WHERE pgflow.step_states.run_id = fail_task.run_id
AND pgflow.step_states.step_slug = fail_task.step_slug;
WHERE r.run_id = fail_task.run_id
AND s.step_slug = fail_task.step_slug;

-- Send broadcast event for step failure if the step was failed
IF v_step_failed THEN
IF v_task_exhausted AND v_step_failed THEN
PERFORM realtime.send(
jsonb_build_object(
'event_type', 'step:failed',
Expand All @@ -194,8 +191,8 @@ IF v_step_failed THEN
);
END IF;

-- Handle step skipping (when_exhausted = 'skip' or 'skip-cascade')
IF v_step_skipped THEN
-- Handle step skipping (when_exhausted = 'skip' or 'skip-cascade')
IF v_task_exhausted AND v_step_skipped THEN
-- Send broadcast event for step skipped
PERFORM realtime.send(
jsonb_build_object(
Expand Down Expand Up @@ -237,11 +234,11 @@ END IF;
AND dep.dep_slug = fail_task.step_slug
AND child_state.step_slug = dep.step_slug;

-- Start any steps that became ready after decrementing remaining_deps
PERFORM pgflow.start_ready_steps(fail_task.run_id);

-- Auto-complete taskless steps (e.g., map steps with initial_tasks=0 from skipped dep)
PERFORM pgflow.cascade_complete_taskless_steps(fail_task.run_id);

-- Start steps that became ready after taskless completion propagation
PERFORM pgflow.start_ready_steps(fail_task.run_id);
END IF;

-- Try to complete the run (remaining_steps may now be 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,8 @@ BEGIN
timeout => (v_step_options->>'timeout')::int,
start_delay => (v_step_options->>'startDelay')::int,
step_type => v_step->>'stepType',
when_unmet => v_step->>'whenUnmet',
when_exhausted => v_step->>'whenExhausted',
when_unmet => COALESCE(v_step->>'whenUnmet', 'skip'),
when_exhausted => COALESCE(v_step->>'whenExhausted', 'fail'),
required_input_pattern => CASE
WHEN (v_step->'requiredInputPattern'->>'defined')::boolean
THEN v_step->'requiredInputPattern'->'value'
Expand Down Expand Up @@ -533,6 +533,43 @@ BEGIN
failed_at = now()
WHERE pgflow.runs.run_id = cascade_resolve_conditions.run_id;

PERFORM realtime.send(
jsonb_build_object(
'event_type', 'step:failed',
'run_id', cascade_resolve_conditions.run_id,
'step_slug', v_first_fail.step_slug,
'status', 'failed',
'error_message', 'Condition not met',
'failed_at', now()
),
concat('step:', v_first_fail.step_slug, ':failed'),
concat('pgflow:run:', cascade_resolve_conditions.run_id),
false
);

PERFORM realtime.send(
jsonb_build_object(
'event_type', 'run:failed',
'run_id', cascade_resolve_conditions.run_id,
'flow_slug', v_first_fail.flow_slug,
'status', 'failed',
'error_message', 'Condition not met',
'failed_at', now()
),
'run:failed',
concat('pgflow:run:', cascade_resolve_conditions.run_id),
false
);

PERFORM pgmq.archive(r.flow_slug, ARRAY_AGG(st.message_id))
FROM pgflow.step_tasks st
JOIN pgflow.runs r ON st.run_id = r.run_id
WHERE st.run_id = cascade_resolve_conditions.run_id
AND st.status IN ('queued', 'started')
AND st.message_id IS NOT NULL
GROUP BY r.flow_slug
HAVING COUNT(st.message_id) > 0;

RETURN false;
END IF;

Expand Down Expand Up @@ -1315,45 +1352,42 @@ maybe_fail_step AS (
WHERE pgflow.step_states.run_id = fail_task.run_id
AND pgflow.step_states.step_slug = fail_task.step_slug
RETURNING pgflow.step_states.*
),
run_update AS (
-- Update run status: only fail when when_exhausted='fail' and step was failed
UPDATE pgflow.runs
SET status = CASE
WHEN (select status from maybe_fail_step) = 'failed' THEN 'failed'
ELSE status
END,
failed_at = CASE
WHEN (select status from maybe_fail_step) = 'failed' THEN now()
ELSE NULL
END,
-- Decrement remaining_steps when step was skipped (not failed, run continues)
remaining_steps = CASE
WHEN (select status from maybe_fail_step) = 'skipped' THEN pgflow.runs.remaining_steps - 1
ELSE pgflow.runs.remaining_steps
END
WHERE pgflow.runs.run_id = fail_task.run_id
RETURNING pgflow.runs.status
)
-- Update run status: only fail when when_exhausted='fail' and step was failed
UPDATE pgflow.runs
SET status = CASE
WHEN (select status from maybe_fail_step) = 'failed' THEN 'failed'
ELSE status
END,
failed_at = CASE
WHEN (select status from maybe_fail_step) = 'failed' THEN now()
ELSE NULL
END,
-- Decrement remaining_steps when step was skipped (not failed, run continues)
remaining_steps = CASE
WHEN (select status from maybe_fail_step) = 'skipped' THEN pgflow.runs.remaining_steps - 1
ELSE pgflow.runs.remaining_steps
END
WHERE pgflow.runs.run_id = fail_task.run_id
RETURNING (status = 'failed') INTO v_run_failed;
SELECT
COALESCE((SELECT status = 'failed' FROM run_update), false),
COALESCE((SELECT status = 'failed' FROM maybe_fail_step), false),
COALESCE((SELECT status = 'skipped' FROM maybe_fail_step), false),
COALESCE((SELECT is_exhausted FROM task_status), false)
INTO v_run_failed, v_step_failed, v_step_skipped, v_task_exhausted;

-- Capture when_exhausted mode and check if step was skipped for later processing
-- Capture when_exhausted mode for later skip handling
SELECT s.when_exhausted INTO v_when_exhausted
FROM pgflow.steps s
JOIN pgflow.runs r ON r.flow_slug = s.flow_slug
WHERE r.run_id = fail_task.run_id
AND s.step_slug = fail_task.step_slug;

SELECT (status = 'skipped') INTO v_step_skipped
FROM pgflow.step_states
WHERE pgflow.step_states.run_id = fail_task.run_id
AND pgflow.step_states.step_slug = fail_task.step_slug;

-- Check if step failed by querying the step_states table
SELECT (status = 'failed') INTO v_step_failed
FROM pgflow.step_states
WHERE pgflow.step_states.run_id = fail_task.run_id
AND pgflow.step_states.step_slug = fail_task.step_slug;
WHERE r.run_id = fail_task.run_id
AND s.step_slug = fail_task.step_slug;

-- Send broadcast event for step failure if the step was failed
IF v_step_failed THEN
IF v_task_exhausted AND v_step_failed THEN
PERFORM realtime.send(
jsonb_build_object(
'event_type', 'step:failed',
Expand All @@ -1369,8 +1403,8 @@ IF v_step_failed THEN
);
END IF;

-- Handle step skipping (when_exhausted = 'skip' or 'skip-cascade')
IF v_step_skipped THEN
-- Handle step skipping (when_exhausted = 'skip' or 'skip-cascade')
IF v_task_exhausted AND v_step_skipped THEN
-- Send broadcast event for step skipped
PERFORM realtime.send(
jsonb_build_object(
Expand Down Expand Up @@ -1412,11 +1446,11 @@ END IF;
AND dep.dep_slug = fail_task.step_slug
AND child_state.step_slug = dep.step_slug;

-- Start any steps that became ready after decrementing remaining_deps
PERFORM pgflow.start_ready_steps(fail_task.run_id);

-- Auto-complete taskless steps (e.g., map steps with initial_tasks=0 from skipped dep)
PERFORM pgflow.cascade_complete_taskless_steps(fail_task.run_id);

-- Start steps that became ready after taskless completion propagation
PERFORM pgflow.start_ready_steps(fail_task.run_id);
END IF;

-- Try to complete the run (remaining_steps may now be 0)
Expand Down
4 changes: 2 additions & 2 deletions pkgs/core/supabase/migrations/atlas.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
h1:ThrUAu9izqXh7CYZpi1VC17rNHGXdQh4yX5fwrTmygU=
h1:dzDaDNhlNGmZ64MoAwWQgOSAWXwmFPwjUWX6AFIKQ/s=
20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s=
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY=
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg=
Expand All @@ -18,4 +18,4 @@ h1:ThrUAu9izqXh7CYZpi1VC17rNHGXdQh4yX5fwrTmygU=
20260103145141_pgflow_step_output_storage.sql h1:mgVHSFDLdtYy//SZ6C03j9Str1iS9xCM8Rz/wyFwn3o=
20260120205547_pgflow_requeue_stalled_tasks.sql h1:4wCBBvjtETCgJf1eXmlH5wCTKDUhiLi0uzsFG1V528E=
20260124113408_pgflow_auth_secret_support.sql h1:i/s1JkBqRElN6FOYFQviJt685W08SuSo30aP25lNlLc=
20260206115746_pgflow_step_conditions.sql h1:rIoXVl0SoVFGHdCFpAQnD6DRSHugzQODZa+UjAhA0ow=
20260206115746_pgflow_step_conditions.sql h1:gxuxa1iDgOSA/yCO446tUIVnhrAvwVdLR1w334yGzMk=
Loading