Skip to content

Commit 5b8dba0

Browse files
committed
done
1 parent 7af4025 commit 5b8dba0

File tree

5 files changed

+203
-107
lines changed

5 files changed

+203
-107
lines changed

apps/sim/app/api/workflows/[id]/execute/route.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1034,7 +1034,13 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
10341034
})
10351035
finalMetaStatus = 'error'
10361036
} finally {
1037-
await eventWriter.close()
1037+
try {
1038+
await eventWriter.close()
1039+
} catch (closeError) {
1040+
logger.warn(`[${requestId}] Failed to close event writer`, {
1041+
error: closeError instanceof Error ? closeError.message : String(closeError),
1042+
})
1043+
}
10381044
if (finalMetaStatus) {
10391045
setExecutionMeta(executionId, { status: finalMetaStatus }).catch(() => {})
10401046
}

apps/sim/app/api/workflows/[id]/executions/[executionId]/stream/route.ts

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import { authorizeWorkflowByWorkspacePermission } from '@/lib/workflows/utils'
1313
const logger = createLogger('ExecutionStreamReconnectAPI')
1414

1515
const POLL_INTERVAL_MS = 500
16+
const MAX_POLL_DURATION_MS = 10 * 60 * 1000 // 10 minutes
1617

1718
function isTerminalStatus(status: ExecutionStreamStatus): boolean {
1819
return status === 'complete' || status === 'error' || status === 'cancelled'
@@ -70,10 +71,13 @@ export async function GET(
7071

7172
const encoder = new TextEncoder()
7273

74+
// Hoisted so cancel() can signal the polling loop to stop
75+
let closed = false
76+
7377
const stream = new ReadableStream<Uint8Array>({
7478
async start(controller) {
7579
let lastEventId = fromEventId
76-
let closed = false
80+
const pollDeadline = Date.now() + MAX_POLL_DURATION_MS
7781

7882
const enqueue = (text: string) => {
7983
if (closed) return
@@ -101,8 +105,8 @@ export async function GET(
101105
return
102106
}
103107

104-
// Poll for new events until execution completes
105-
while (!closed) {
108+
// Poll for new events until execution completes or deadline is reached
109+
while (!closed && Date.now() < pollDeadline) {
106110
await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL_MS))
107111
if (closed) return
108112

@@ -127,6 +131,13 @@ export async function GET(
127131
return
128132
}
129133
}
134+
135+
// Deadline reached — close gracefully
136+
if (!closed) {
137+
logger.warn('Reconnection stream poll deadline reached', { executionId })
138+
enqueue('data: [DONE]\n\n')
139+
controller.close()
140+
}
130141
} catch (error) {
131142
logger.error('Error in reconnection stream', {
132143
executionId,
@@ -140,6 +151,7 @@ export async function GET(
140151
}
141152
},
142153
cancel() {
154+
closed = true
143155
logger.info('Client disconnected from reconnection stream', { executionId })
144156
},
145157
})

0 commit comments

Comments
 (0)