Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 32 additions & 27 deletions apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { PrismaClient, prisma } from "~/db.server";
import { type PrismaClient, prisma } from "~/db.server";
import { logger } from "~/services/logger.server";
import { singleton } from "~/utils/singleton";
import { createSSELoader } from "~/utils/sse";
import { createSSELoader, SendFunction } from "~/utils/sse";
import { throttle } from "~/utils/throttle";
import { tracePubSub } from "~/v3/services/tracePubSub.server";

const PING_INTERVAL = 1000;
const STREAM_TIMEOUT = 30 * 1000; // 30 seconds
const PING_INTERVAL = 5_000;
const STREAM_TIMEOUT = 30_000;

export class RunStreamPresenter {
#prismaClient: PrismaClient;
Expand Down Expand Up @@ -49,36 +49,40 @@ export class RunStreamPresenter {
// Subscribe to trace updates
const { unsubscribe, eventEmitter } = await tracePubSub.subscribeToTrace(run.traceId);

// Store throttled send function and message listener for cleanup
let throttledSend: ReturnType<typeof throttle> | undefined;
// Only send max every 1 second
const throttledSend = throttle(
(args: { send: SendFunction; event?: string; data: string }) => {
try {
args.send({ event: args.event, data: args.data });
} catch (error) {
if (error instanceof Error) {
if (error.name !== "TypeError") {
logger.debug("Error sending SSE in RunStreamPresenter", {
error: {
name: error.name,
message: error.message,
stack: error.stack,
},
});
}
}
// Abort the stream on send error
context.controller.abort("Send error");
}
},
1000
);

let messageListener: ((event: string) => void) | undefined;

return {
initStream: ({ send }) => {
// Create throttled send function
throttledSend = throttle((args: { event?: string; data: string }) => {
try {
send(args);
} catch (error) {
if (error instanceof Error) {
if (error.name !== "TypeError") {
logger.debug("Error sending SSE in RunStreamPresenter", {
error: {
name: error.name,
message: error.message,
stack: error.stack,
},
});
}
}
// Abort the stream on send error
context.controller.abort("Send error");
}
}, 1000);
throttledSend({ send, event: "message", data: new Date().toISOString() });

// Set up message listener for pub/sub events
messageListener = (event: string) => {
throttledSend?.({ data: event });
throttledSend({ send, event: "message", data: event });
};
eventEmitter.addListener("message", messageListener);

Expand All @@ -88,7 +92,8 @@ export class RunStreamPresenter {
iterator: ({ send }) => {
// Send ping to keep connection alive
try {
send({ event: "ping", data: new Date().toISOString() });
// Send an actual message so the client refreshes
throttledSend({ send, event: "message", data: new Date().toISOString() });
} catch (error) {
// If we can't send a ping, the connection is likely dead
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,24 @@ export default function Page() {
);
}

function shouldLiveReload({
events,
maximumLiveReloadingSetting,
run,
}: {
events: TraceEvent[];
maximumLiveReloadingSetting: number;
run: { completedAt: string | null };
}): boolean {
// We don't live reload if there are a ton of spans/logs
if (events.length > maximumLiveReloadingSetting) return false;

// If the run was completed a while ago, we don't need to live reload anymore
if (run.completedAt && new Date(run.completedAt).getTime() < Date.now() - 30_000) return false;

return true;
}

function TraceView({
run,
trace,
Expand All @@ -453,18 +471,19 @@ function TraceView({

const { events, duration, rootSpanStatus, rootStartedAt, queuedDuration, overridesBySpanId } =
trace;
const shouldLiveReload = events.length <= maximumLiveReloadingSetting;

const changeToSpan = useDebounce((selectedSpan: string) => {
replaceSearchParam("span", selectedSpan, { replace: true });
}, 250);

const isLiveReloading = shouldLiveReload({ events, maximumLiveReloadingSetting, run });

const revalidator = useRevalidator();
const streamedEvents = useEventSource(
v3RunStreamingPath(organization, project, environment, run),
{
event: "message",
disabled: !shouldLiveReload,
disabled: !isLiveReloading,
}
);
useEffect(() => {
Expand Down Expand Up @@ -511,7 +530,7 @@ function TraceView({
rootStartedAt={rootStartedAt ? new Date(rootStartedAt) : undefined}
queuedDuration={queuedDuration}
environmentType={run.environment.type}
shouldLiveReload={shouldLiveReload}
shouldLiveReload={isLiveReloading}
maximumLiveReloadingSetting={maximumLiveReloadingSetting}
rootRun={run.rootTaskRun}
parentRun={run.parentTaskRun}
Expand Down
8 changes: 4 additions & 4 deletions apps/webapp/app/utils/throttle.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
//From: https://kettanaito.com/blog/debounce-vs-throttle

/** A very simple throttle. Will execute the function at the end of each period and discard any other calls during that period. */
export function throttle(
func: (...args: any[]) => void,
export function throttle<TArgs extends unknown[]>(
func: (...args: TArgs) => void,
durationMs: number
): (...args: any[]) => void {
): (...args: TArgs) => void {
let isPrimedToFire = false;

return (...args: any[]) => {
return (...args: TArgs) => {
if (!isPrimedToFire) {
isPrimedToFire = true;

Expand Down