Conversation
Add MongoDB, payload and delta size metrics with OOM-debug logging
There was a problem hiding this comment.
Pull request overview
This pull request adds Prometheus metrics instrumentation to the Grouper worker to enable monitoring of event processing. The implementation creates a shared metrics registry, adds various counters and histograms to track events, errors, durations, and data sizes, and sets up automatic periodic pushing to a Prometheus Pushgateway.
Changes:
- Added new
lib/metrics.tsmodule with shared Prometheus registry and push functionality - Instrumented GrouperWorker with 7 metrics: events counter (new/repeated), processing duration, errors, MongoDB operation durations, delta/payload sizes, and duplicate retries
- Re-enabled metrics pushing in runner with simplified initialization
- Added environment variables for memory monitoring configuration
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 9 comments.
| File | Description |
|---|---|
| lib/metrics.ts | New module providing shared Prometheus registry and pushgateway integration |
| workers/grouper/src/index.ts | Added metrics instrumentation, timer tracking for operations, and memory monitoring utilities |
| runner.ts | Re-enabled metrics initialization calling new metrics module for each worker |
| .env.sample | Added environment variables for memory monitoring thresholds |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| */ | ||
| export function startMetricsPushing(workerName: string): void { | ||
| const url = process.env.PROMETHEUS_PUSHGATEWAY_URL; | ||
| const interval = parseInt(process.env.PROMETHEUS_PUSHGATEWAY_INTERVAL || '10000'); |
There was a problem hiding this comment.
The PROMETHEUS_PUSHGATEWAY_INTERVAL environment variable parsing lacks error handling. If the environment variable contains an invalid number, parseInt will return NaN, and this will be passed to setInterval on line 32, which will cause the callback to be invoked immediately and repeatedly in a tight loop, potentially causing performance issues.
Add validation to check if the parsed interval is a valid positive number, and either throw an error or use a sensible default if it's invalid.
| const interval = parseInt(process.env.PROMETHEUS_PUSHGATEWAY_INTERVAL || '10000'); | |
| const DEFAULT_INTERVAL = 10000; | |
| const rawInterval = process.env.PROMETHEUS_PUSHGATEWAY_INTERVAL; | |
| const parsedInterval = rawInterval !== undefined ? parseInt(rawInterval, 10) : DEFAULT_INTERVAL; | |
| const interval = | |
| Number.isFinite(parsedInterval) && parsedInterval > 0 | |
| ? parsedInterval | |
| : (() => { | |
| if (rawInterval !== undefined) { | |
| console.warn( | |
| `Invalid PROMETHEUS_PUSHGATEWAY_INTERVAL "${rawInterval}", falling back to default ${DEFAULT_INTERVAL}ms`, | |
| ); | |
| } | |
| return DEFAULT_INTERVAL; | |
| })(); |
| const register = new client.Registry(); | ||
|
|
||
| client.collectDefaultMetrics({ register }); | ||
|
|
||
| export { register, client }; |
There was a problem hiding this comment.
The shared global registry could cause issues if multiple worker types are instantiated in the same process. All workers would register their metrics to the same registry, and when metrics are pushed to Pushgateway, metrics from all workers would be included under each worker's grouping labels. This could lead to incorrect or confusing metrics attribution.
Consider either using separate registries per worker type, or ensure metrics are properly labeled to distinguish between different workers when using a shared registry.
| setInterval(() => { | ||
| gateway.pushAdd({ jobName: 'workers', groupings: { worker: workerName, host: hostname, id } }, (err) => { | ||
| if (err) { | ||
| console.error('Metrics push error:', err); |
There was a problem hiding this comment.
Using console.error for error logging is inconsistent with the rest of the codebase, which uses a logger module (see lib/logger.ts and usage in workers). The error message will not benefit from structured logging, log levels, or any other logging infrastructure features.
Consider using HawkCatcher or a logger instance consistent with the rest of the codebase for error reporting.
| const endTimer = this.metricsMongoDuration.startTimer({ operation: 'saveEvent' }); | ||
|
|
||
| const collection = this.eventsDb.getConnection().collection(`events:${projectId}`); | ||
|
|
||
| encodeUnsafeFields(groupedEventData); | ||
|
|
||
| return (await collection | ||
| const result = (await collection | ||
| .insertOne(groupedEventData)).insertedId as mongodb.ObjectID; | ||
|
|
||
| endTimer(); |
There was a problem hiding this comment.
The timer for MongoDB operation is not stopped in the error path. If insertOne throws an error (before line 811), the timer started on line 804 will never be stopped, resulting in incorrect duration metrics.
Wrap the database operation in a try-catch-finally block or try-finally block to ensure endTimer() is always called.
|
|
||
| const gateway = new client.Pushgateway(url, [], register); | ||
|
|
||
| console.log(`Start pushing metrics to ${url} every ${interval}ms (host: ${hostname}, id: ${id})`); |
There was a problem hiding this comment.
Using console.log for informational messages is inconsistent with the rest of the codebase, which uses a logger module throughout (see lib/logger.ts and usage in workers like grouper). This message won't benefit from structured logging, log levels, or any other logging infrastructure features.
Consider using a logger instance consistent with the rest of the codebase.
| this.workers.forEach((worker) => { | ||
| startMetricsPushing(worker.type.replace('/', '_')); | ||
| }); |
There was a problem hiding this comment.
The metrics pushing intervals created by startMetricsPushing are never cleaned up when workers are stopped. The runner's stopWorker method on line 227 attempts to clear this.pushIntervalNumber, but this property is no longer set since the metrics pushing logic was moved to lib/metrics.ts. This will cause the intervals to continue running after workers are stopped, leading to potential memory leaks and failed push attempts.
Consider returning the interval ID from startMetricsPushing so it can be stored and cleared when workers are stopped, or implement a cleanup function in the metrics module.
| this.workers.forEach((worker) => { | |
| startMetricsPushing(worker.type.replace('/', '_')); | |
| }); | |
| if (this.workers.length === 0) { | |
| return; | |
| } | |
| const workerTypeForMetrics = this.workers[0].type.replace('/', '_'); | |
| this.pushIntervalNumber = startMetricsPushing(workerTypeForMetrics); |
| if (e.code?.toString() === DB_DUPLICATE_KEY_ERROR) { | ||
| this.metricsDuplicateRetries.inc(); | ||
| this.logger.info(`[saveEvent] duplicate key, retrying as repetition`); | ||
| await this.handle(task); |
There was a problem hiding this comment.
Recursive call to this.handle(task) will cause metrics to be double-counted. When a duplicate key error occurs and this line calls handle again, the metrics in the outer handle method (line 227-237) will increment metricsHandleDuration and potentially metricsErrorsTotal again, in addition to the metrics incremented by the recursive call. Additionally, handledTasksCount will be incremented multiple times for the same logical task.
Consider either calling handleInternal(task) directly here to avoid the double counting, or track a flag to prevent metrics from being incremented on retry attempts.
| await this.handle(task); | |
| await this.handleInternal(task); |
| const endTimer = this.metricsMongoDuration.startTimer({ operation: 'getEvent' }); | ||
|
|
||
| const result = await this.eventsDb.getConnection() | ||
| .collection(`events:${projectId}`) | ||
| .findOne({ | ||
| groupHash, | ||
| }) | ||
| .catch((err) => { | ||
| throw new DatabaseReadWriteError(err); | ||
| }); | ||
|
|
||
| endTimer(); |
There was a problem hiding this comment.
The timer for MongoDB operation should be stopped in the error path. Currently, if findOne throws via the .catch() handler (line 770-772), the timer started on line 763 is never stopped. This will result in incorrect duration metrics.
Move the endTimer() call to execute in both success and error paths, or use a try-finally block to ensure it always executes.
| setInterval(() => { | ||
| gateway.pushAdd({ jobName: 'workers', groupings: { worker: workerName, host: hostname, id } }, (err) => { | ||
| if (err) { | ||
| console.error('Metrics push error:', err); | ||
| } | ||
| }); | ||
| }, interval); |
There was a problem hiding this comment.
Multiple timers may be created and never stopped if startMetricsPushing is called multiple times. The function creates a new interval on line 32 without storing or clearing any previous intervals. If multiple workers are started (which happens in the runner on line 89-91), this could create multiple timers that push metrics for each worker type, and there's no cleanup mechanism to stop these intervals.
Consider storing the interval ID and providing a cleanup mechanism, or ensure the function is only called once per worker type.
| const MB_IN_BYTES = 1_048_576; | ||
| const HUNDRED = 100; | ||
| const DEFAULT_MEMORY_LOG_EVERY_TASKS = 50; | ||
| const DEFAULT_MEMORY_GROWTH_WINDOW_TASKS = 200; | ||
| const DEFAULT_MEMORY_GROWTH_WARN_MB = 64; | ||
| const DEFAULT_MEMORY_HANDLE_GROWTH_WARN_MB = 16; | ||
| // eslint-disable-next-line @typescript-eslint/no-magic-numbers | ||
| const METRICS_SIZE_BUCKETS = [100, 500, 1000, 5000, 10000, 50000, 100000, 500000]; | ||
|
|
||
| function asPositiveNumber(value: string | undefined, fallback: number): number { | ||
| const parsed = Number(value); | ||
|
|
||
| return Number.isFinite(parsed) && parsed > 0 ? parsed : fallback; | ||
| } | ||
|
|
||
| const MEMORY_LOG_EVERY_TASKS = asPositiveNumber(process.env.GROUPER_MEMORY_LOG_EVERY_TASKS, DEFAULT_MEMORY_LOG_EVERY_TASKS); | ||
| const MEMORY_GROWTH_WINDOW_TASKS = asPositiveNumber(process.env.GROUPER_MEMORY_GROWTH_WINDOW_TASKS, DEFAULT_MEMORY_GROWTH_WINDOW_TASKS); | ||
| const MEMORY_GROWTH_WARN_MB = asPositiveNumber(process.env.GROUPER_MEMORY_GROWTH_WARN_MB, DEFAULT_MEMORY_GROWTH_WARN_MB); | ||
| const MEMORY_HANDLE_GROWTH_WARN_MB = asPositiveNumber(process.env.GROUPER_MEMORY_HANDLE_GROWTH_WARN_MB, DEFAULT_MEMORY_HANDLE_GROWTH_WARN_MB); |
There was a problem hiding this comment.
I'd suggest not to bloat the single file and move these parameters to a separate one.
| /** | ||
| * Logs sustained heap growth over a configurable number of handled tasks. | ||
| */ | ||
| private checkMemoryGrowthWindow(memoryUsage: NodeJS.MemoryUsage, handledTasksCount: number): void { |
There was a problem hiding this comment.
can we move metric-related method to a separate file?
There was a problem hiding this comment.
the main suggestion is no somehow separate metric collating from the Worker source code. Because it will be hard to understand and maintain the code base where business logic is mixed with the metrics code
Add Prometheus metrics to Grouper worker: events counter (new/repeated), processing duration histogram, errors counter. Metrics are pushed to Pushgateway every 10s.
grouper-metrics-hawk_worker_grouper_metrics_e2e_1771436868147.log