Skip to content

feat(grouper): add Prometheus metrics#520

Open
Kuchizu wants to merge 5 commits intomasterfrom
feat/grouper-metrics
Open

feat(grouper): add Prometheus metrics#520
Kuchizu wants to merge 5 commits intomasterfrom
feat/grouper-metrics

Conversation

@Kuchizu
Copy link
Member

@Kuchizu Kuchizu commented Feb 4, 2026

Add Prometheus metrics to Grouper worker: events counter (new/repeated), processing duration histogram, errors counter. Metrics are pushed to Pushgateway every 10s.

grouper-metrics-hawk_worker_grouper_metrics_e2e_1771436868147.log

Add MongoDB, payload and delta size metrics with OOM-debug logging
@Kuchizu Kuchizu requested a review from neSpecc February 10, 2026 21:17
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request adds Prometheus metrics instrumentation to the Grouper worker to enable monitoring of event processing. The implementation creates a shared metrics registry, adds various counters and histograms to track events, errors, durations, and data sizes, and sets up automatic periodic pushing to a Prometheus Pushgateway.

Changes:

  • Added new lib/metrics.ts module with shared Prometheus registry and push functionality
  • Instrumented GrouperWorker with 7 metrics: events counter (new/repeated), processing duration, errors, MongoDB operation durations, delta/payload sizes, and duplicate retries
  • Re-enabled metrics pushing in runner with simplified initialization
  • Added environment variables for memory monitoring configuration

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 9 comments.

File Description
lib/metrics.ts New module providing shared Prometheus registry and pushgateway integration
workers/grouper/src/index.ts Added metrics instrumentation, timer tracking for operations, and memory monitoring utilities
runner.ts Re-enabled metrics initialization calling new metrics module for each worker
.env.sample Added environment variables for memory monitoring thresholds

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

*/
export function startMetricsPushing(workerName: string): void {
const url = process.env.PROMETHEUS_PUSHGATEWAY_URL;
const interval = parseInt(process.env.PROMETHEUS_PUSHGATEWAY_INTERVAL || '10000');
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PROMETHEUS_PUSHGATEWAY_INTERVAL environment variable parsing lacks error handling. If the environment variable contains an invalid number, parseInt will return NaN, and this will be passed to setInterval on line 32, which will cause the callback to be invoked immediately and repeatedly in a tight loop, potentially causing performance issues.

Add validation to check if the parsed interval is a valid positive number, and either throw an error or use a sensible default if it's invalid.

Suggested change
const interval = parseInt(process.env.PROMETHEUS_PUSHGATEWAY_INTERVAL || '10000');
const DEFAULT_INTERVAL = 10000;
const rawInterval = process.env.PROMETHEUS_PUSHGATEWAY_INTERVAL;
const parsedInterval = rawInterval !== undefined ? parseInt(rawInterval, 10) : DEFAULT_INTERVAL;
const interval =
Number.isFinite(parsedInterval) && parsedInterval > 0
? parsedInterval
: (() => {
if (rawInterval !== undefined) {
console.warn(
`Invalid PROMETHEUS_PUSHGATEWAY_INTERVAL "${rawInterval}", falling back to default ${DEFAULT_INTERVAL}ms`,
);
}
return DEFAULT_INTERVAL;
})();

Copilot uses AI. Check for mistakes.
Comment on lines +5 to +9
const register = new client.Registry();

client.collectDefaultMetrics({ register });

export { register, client };
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The shared global registry could cause issues if multiple worker types are instantiated in the same process. All workers would register their metrics to the same registry, and when metrics are pushed to Pushgateway, metrics from all workers would be included under each worker's grouping labels. This could lead to incorrect or confusing metrics attribution.

Consider either using separate registries per worker type, or ensure metrics are properly labeled to distinguish between different workers when using a shared registry.

Copilot uses AI. Check for mistakes.
setInterval(() => {
gateway.pushAdd({ jobName: 'workers', groupings: { worker: workerName, host: hostname, id } }, (err) => {
if (err) {
console.error('Metrics push error:', err);
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using console.error for error logging is inconsistent with the rest of the codebase, which uses a logger module (see lib/logger.ts and usage in workers). The error message will not benefit from structured logging, log levels, or any other logging infrastructure features.

Consider using HawkCatcher or a logger instance consistent with the rest of the codebase for error reporting.

Copilot uses AI. Check for mistakes.
Comment on lines +804 to +813
const endTimer = this.metricsMongoDuration.startTimer({ operation: 'saveEvent' });

const collection = this.eventsDb.getConnection().collection(`events:${projectId}`);

encodeUnsafeFields(groupedEventData);

return (await collection
const result = (await collection
.insertOne(groupedEventData)).insertedId as mongodb.ObjectID;

endTimer();
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timer for MongoDB operation is not stopped in the error path. If insertOne throws an error (before line 811), the timer started on line 804 will never be stopped, resulting in incorrect duration metrics.

Wrap the database operation in a try-catch-finally block or try-finally block to ensure endTimer() is always called.

Copilot uses AI. Check for mistakes.

const gateway = new client.Pushgateway(url, [], register);

console.log(`Start pushing metrics to ${url} every ${interval}ms (host: ${hostname}, id: ${id})`);
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using console.log for informational messages is inconsistent with the rest of the codebase, which uses a logger module throughout (see lib/logger.ts and usage in workers like grouper). This message won't benefit from structured logging, log levels, or any other logging infrastructure features.

Consider using a logger instance consistent with the rest of the codebase.

Copilot uses AI. Check for mistakes.
Comment on lines +89 to +91
this.workers.forEach((worker) => {
startMetricsPushing(worker.type.replace('/', '_'));
});
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The metrics pushing intervals created by startMetricsPushing are never cleaned up when workers are stopped. The runner's stopWorker method on line 227 attempts to clear this.pushIntervalNumber, but this property is no longer set since the metrics pushing logic was moved to lib/metrics.ts. This will cause the intervals to continue running after workers are stopped, leading to potential memory leaks and failed push attempts.

Consider returning the interval ID from startMetricsPushing so it can be stored and cleared when workers are stopped, or implement a cleanup function in the metrics module.

Suggested change
this.workers.forEach((worker) => {
startMetricsPushing(worker.type.replace('/', '_'));
});
if (this.workers.length === 0) {
return;
}
const workerTypeForMetrics = this.workers[0].type.replace('/', '_');
this.pushIntervalNumber = startMetricsPushing(workerTypeForMetrics);

Copilot uses AI. Check for mistakes.
if (e.code?.toString() === DB_DUPLICATE_KEY_ERROR) {
this.metricsDuplicateRetries.inc();
this.logger.info(`[saveEvent] duplicate key, retrying as repetition`);
await this.handle(task);
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recursive call to this.handle(task) will cause metrics to be double-counted. When a duplicate key error occurs and this line calls handle again, the metrics in the outer handle method (line 227-237) will increment metricsHandleDuration and potentially metricsErrorsTotal again, in addition to the metrics incremented by the recursive call. Additionally, handledTasksCount will be incremented multiple times for the same logical task.

Consider either calling handleInternal(task) directly here to avoid the double counting, or track a flag to prevent metrics from being incremented on retry attempts.

Suggested change
await this.handle(task);
await this.handleInternal(task);

Copilot uses AI. Check for mistakes.
Comment on lines +763 to +774
const endTimer = this.metricsMongoDuration.startTimer({ operation: 'getEvent' });

const result = await this.eventsDb.getConnection()
.collection(`events:${projectId}`)
.findOne({
groupHash,
})
.catch((err) => {
throw new DatabaseReadWriteError(err);
});

endTimer();
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timer for MongoDB operation should be stopped in the error path. Currently, if findOne throws via the .catch() handler (line 770-772), the timer started on line 763 is never stopped. This will result in incorrect duration metrics.

Move the endTimer() call to execute in both success and error paths, or use a try-finally block to ensure it always executes.

Copilot uses AI. Check for mistakes.
Comment on lines +32 to +38
setInterval(() => {
gateway.pushAdd({ jobName: 'workers', groupings: { worker: workerName, host: hostname, id } }, (err) => {
if (err) {
console.error('Metrics push error:', err);
}
});
}, interval);
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multiple timers may be created and never stopped if startMetricsPushing is called multiple times. The function creates a new interval on line 32 without storing or clearing any previous intervals. If multiple workers are started (which happens in the runner on line 89-91), this could create multiple timers that push metrics for each worker type, and there's no cleanup mechanism to stop these intervals.

Consider storing the interval ID and providing a cleanup mechanism, or ensure the function is only called once per worker type.

Copilot uses AI. Check for mistakes.
Comment on lines +57 to +75
const MB_IN_BYTES = 1_048_576;
const HUNDRED = 100;
const DEFAULT_MEMORY_LOG_EVERY_TASKS = 50;
const DEFAULT_MEMORY_GROWTH_WINDOW_TASKS = 200;
const DEFAULT_MEMORY_GROWTH_WARN_MB = 64;
const DEFAULT_MEMORY_HANDLE_GROWTH_WARN_MB = 16;
// eslint-disable-next-line @typescript-eslint/no-magic-numbers
const METRICS_SIZE_BUCKETS = [100, 500, 1000, 5000, 10000, 50000, 100000, 500000];

function asPositiveNumber(value: string | undefined, fallback: number): number {
const parsed = Number(value);

return Number.isFinite(parsed) && parsed > 0 ? parsed : fallback;
}

const MEMORY_LOG_EVERY_TASKS = asPositiveNumber(process.env.GROUPER_MEMORY_LOG_EVERY_TASKS, DEFAULT_MEMORY_LOG_EVERY_TASKS);
const MEMORY_GROWTH_WINDOW_TASKS = asPositiveNumber(process.env.GROUPER_MEMORY_GROWTH_WINDOW_TASKS, DEFAULT_MEMORY_GROWTH_WINDOW_TASKS);
const MEMORY_GROWTH_WARN_MB = asPositiveNumber(process.env.GROUPER_MEMORY_GROWTH_WARN_MB, DEFAULT_MEMORY_GROWTH_WARN_MB);
const MEMORY_HANDLE_GROWTH_WARN_MB = asPositiveNumber(process.env.GROUPER_MEMORY_HANDLE_GROWTH_WARN_MB, DEFAULT_MEMORY_HANDLE_GROWTH_WARN_MB);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest not to bloat the single file and move these parameters to a separate one.

/**
* Logs sustained heap growth over a configurable number of handled tasks.
*/
private checkMemoryGrowthWindow(memoryUsage: NodeJS.MemoryUsage, handledTasksCount: number): void {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move metric-related method to a separate file?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the main suggestion is no somehow separate metric collating from the Worker source code. Because it will be hard to understand and maintain the code base where business logic is mixed with the metrics code

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants

Comments