fix: archive task messages for skipped steps and handle late callbacks#609
fix: archive task messages for skipped steps and handle late callbacks#609jumski wants to merge 1 commit into02-06-cover_more_edge_casesfrom
Conversation
|
|
Warning This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
This stack of pull requests is managed by Graphite. Learn more about stacking. |
|
View your CI Pipeline Execution ↗ for commit 3142409
☁️ Nx Cloud last updated this comment at |
c137f91 to
3142409
Compare
| archived_messages AS ( | ||
| SELECT pgmq.archive(v_flow_slug, ARRAY_AGG(st.message_id)) as result | ||
| FROM pgflow.step_tasks st | ||
| WHERE st.run_id = _cascade_force_skip_steps.run_id | ||
| AND st.step_slug IN (SELECT sk.step_slug FROM skipped sk) | ||
| AND st.status IN ('queued', 'started') | ||
| AND st.message_id IS NOT NULL | ||
| HAVING COUNT(st.message_id) > 0 | ||
| ), |
There was a problem hiding this comment.
The archived_messages CTE is defined but never referenced or used. This creates duplicate archiving logic since lines 115-123 perform the same archiving operation again after the CTE chain completes. This causes messages to be archived twice or the first archiving to fail if no messages match at CTE execution time.
-- Remove the unused CTE entirely:
-- archived_messages AS (
-- SELECT pgmq.archive(v_flow_slug, ARRAY_AGG(st.message_id)) as result
-- ...
-- ),
-- Keep only the PERFORM statement at lines 115-123The CTE result is never consumed (no SELECT * FROM archived_messages or similar), making it dead code that still executes and potentially conflicts with the later PERFORM statement.
| archived_messages AS ( | |
| SELECT pgmq.archive(v_flow_slug, ARRAY_AGG(st.message_id)) as result | |
| FROM pgflow.step_tasks st | |
| WHERE st.run_id = _cascade_force_skip_steps.run_id | |
| AND st.step_slug IN (SELECT sk.step_slug FROM skipped sk) | |
| AND st.status IN ('queued', 'started') | |
| AND st.message_id IS NOT NULL | |
| HAVING COUNT(st.message_id) > 0 | |
| ), | |
| -- archived_messages CTE removed as it was redundant with later archiving code |
Spotted by Graphite Agent
Is this helpful? React 👍 or 👎 to let us know.
🔍 Preview Deployment: Website✅ Deployment successful! 🔗 Preview URL: https://pr-609.pgflow.pages.dev 📝 Details:
_Last updated: _ |

Archive Task Messages for Skipped Steps
This PR adds message archiving functionality to prevent "zombie" tasks from being processed after a step has been skipped or completed. Key improvements:
Added guards in
complete_taskandfail_taskto handle late callbacks:Enhanced
_cascade_force_skip_stepsto archive queued/started task messages for skipped steps:Modified
fail_taskto archive sibling task messages when a step is skipped:Improved
start_tasksto reject tasks for skipped steps:Added comprehensive tests to verify all these behaviors work correctly.