Skip to content

fix: archive task messages for skipped steps and handle late callbacks#609

Open
jumski wants to merge 1 commit into02-06-cover_more_edge_casesfrom
02-14-fix_missing_skip_archival
Open

fix: archive task messages for skipped steps and handle late callbacks#609
jumski wants to merge 1 commit into02-06-cover_more_edge_casesfrom
02-14-fix_missing_skip_archival

Conversation

@jumski
Copy link
Contributor

@jumski jumski commented Feb 14, 2026

Archive Task Messages for Skipped Steps

This PR adds message archiving functionality to prevent "zombie" tasks from being processed after a step has been skipped or completed. Key improvements:

  1. Added guards in complete_task and fail_task to handle late callbacks:

    • Detects when callbacks arrive after a step is no longer in 'started' state
    • Archives the task message to prevent stuck work
    • Returns current task state without mutating step or run state
  2. Enhanced _cascade_force_skip_steps to archive queued/started task messages for skipped steps:

    • Ensures all task messages are archived when a step is force-skipped
    • Prevents workers from processing tasks for steps that are already skipped
  3. Modified fail_task to archive sibling task messages when a step is skipped:

    • When a task failure causes a step to be skipped, all sibling task messages are archived
    • Prevents double-decrement of remaining_steps when multiple tasks fail on the same step
  4. Improved start_tasks to reject tasks for skipped steps:

    • Added guard conditions to only start tasks for steps in 'started' state
    • Prevents race conditions where a task might be started after its step was skipped

Added comprehensive tests to verify all these behaviors work correctly.

@changeset-bot
Copy link

changeset-bot bot commented Feb 14, 2026

⚠️ No Changeset found

Latest commit: 3142409

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

@nx-cloud
Copy link

nx-cloud bot commented Feb 14, 2026

View your CI Pipeline Execution ↗ for commit 3142409

Command Status Duration Result
nx run edge-worker:test:integration ✅ Succeeded 7m 1s View ↗
nx run client:e2e ✅ Succeeded 3m 46s View ↗
nx affected -t verify-exports --base=origin/mai... ✅ Succeeded 4s View ↗
nx affected -t build --configuration=production... ✅ Succeeded 4s View ↗
nx affected -t lint typecheck test --parallel -... ✅ Succeeded 3m 4s View ↗
nx run core:pgtap ✅ Succeeded 2m 33s View ↗
nx run cli:e2e ✅ Succeeded 5s View ↗
nx run edge-worker:e2e ✅ Succeeded 41s View ↗

☁️ Nx Cloud last updated this comment at 2026-02-14 18:36:42 UTC

@jumski jumski force-pushed the 02-14-fix_missing_skip_archival branch from c137f91 to 3142409 Compare February 14, 2026 18:26
Comment on lines +94 to +102
archived_messages AS (
SELECT pgmq.archive(v_flow_slug, ARRAY_AGG(st.message_id)) as result
FROM pgflow.step_tasks st
WHERE st.run_id = _cascade_force_skip_steps.run_id
AND st.step_slug IN (SELECT sk.step_slug FROM skipped sk)
AND st.status IN ('queued', 'started')
AND st.message_id IS NOT NULL
HAVING COUNT(st.message_id) > 0
),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The archived_messages CTE is defined but never referenced or used. This creates duplicate archiving logic since lines 115-123 perform the same archiving operation again after the CTE chain completes. This causes messages to be archived twice or the first archiving to fail if no messages match at CTE execution time.

-- Remove the unused CTE entirely:
-- archived_messages AS (
--   SELECT pgmq.archive(v_flow_slug, ARRAY_AGG(st.message_id)) as result
--   ...
-- ),

-- Keep only the PERFORM statement at lines 115-123

The CTE result is never consumed (no SELECT * FROM archived_messages or similar), making it dead code that still executes and potentially conflicts with the later PERFORM statement.

Suggested change
archived_messages AS (
SELECT pgmq.archive(v_flow_slug, ARRAY_AGG(st.message_id)) as result
FROM pgflow.step_tasks st
WHERE st.run_id = _cascade_force_skip_steps.run_id
AND st.step_slug IN (SELECT sk.step_slug FROM skipped sk)
AND st.status IN ('queued', 'started')
AND st.message_id IS NOT NULL
HAVING COUNT(st.message_id) > 0
),
-- archived_messages CTE removed as it was redundant with later archiving code

Spotted by Graphite Agent

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

@github-actions
Copy link
Contributor

🔍 Preview Deployment: Website

Deployment successful!

🔗 Preview URL: https://pr-609.pgflow.pages.dev

📝 Details:

  • Branch: 02-14-fix_missing_skip_archival
  • Commit: 609e33b572a873741bb16a59fd761582c52d5d1f
  • View Logs

_Last updated: _

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant