Skip to content

[SPIKE] Audits using EF#5278

Draft
johnsimons wants to merge 72 commits intomasterfrom
john/audit_ef
Draft

[SPIKE] Audits using EF#5278
johnsimons wants to merge 72 commits intomasterfrom
john/audit_ef

Conversation

@johnsimons
Copy link
Member

No description provided.

Refactors the upsert logic in several data stores to leverage EF Core's change tracking more efficiently.

Instead of creating a new entity and then calling Update, the code now fetches the existing entity (if any) and modifies its properties directly.
This reduces the overhead and potential issues associated with detached entities.

The RecoverabilityIngestionUnitOfWork is also updated to use change tracking for FailedMessageEntity updates.

This commit was made on the `john/more_interfaces` branch.
Adds data store and entities required for persisting licensing and throughput data.

This includes adding new tables for licensing metadata, throughput endpoints, and daily throughput data, as well as configurations and a data store implementation to interact with these tables.
Also added headers to the serialised entity
Updates data stores to utilize IServiceScopeFactory instead of IServiceProvider for creating database scopes.

This change improves dependency injection and resource management,
ensuring proper scope lifecycle management, especially for asynchronous operations.
Adds full-text search capabilities for error messages, allowing users to search within message headers and, optionally, the message body.

Introduces an interface for full-text search providers to abstract the database-specific implementation.

Stores small message bodies inline for faster retrieval and populates a searchable text field from headers and the message body.

Adds configuration option to set the maximum body size to store inline.
Updates Entity Framework Core and related packages to the latest versions.

This ensures compatibility with the latest features and bug fixes in the EF Core ecosystem. It also addresses potential security vulnerabilities and improves overall performance.
Removes the MySQL persistence implementation due to its incomplete state and lack of audit support.
This simplifies the codebase and focuses resources on fully supported persistence options.

The related test projects and SQL persistence files have been removed.
Package versions are updated to align with current versions.
Introduces a setting to control whether message bodies are stored on disk.

This is useful for scenarios where disk space is a concern or message bodies
are not required for auditing purposes. It enhances configuration flexibility.
@johnsimons
Copy link
Member Author

johnsimons commented Feb 3, 2026

Audit Ingestion Parallel Processing Improvements

Executive Summary

Refactored the AuditIngestion class to support parallel database writes, significantly increasing throughput by decoupling transport message dequeuing from database persistence.


Architecture Comparison

Before: Sequential Processing

Transport (MaxConcurrency threads, e.g., 32)
    ↓
Channel<MessageContext> (capacity = MaxConcurrency = 32)
    ↓
Single Consumer (ExecuteAsync loop)
    ↓
Sequential DB Write (one batch at a time)
    ↓
Complete TaskCompletionSource → Ack message

Bottleneck: Single reader processes one batch at a time. All transport threads wait for DB write to complete before their messages are acknowledged.

After: Parallel Processing

Transport (MaxConcurrency threads, e.g., 100)
    ↓
Channel<MessageContext> (capacity = BatchSize × MaxParallelWriters × 2 = 400)
    ↓
Batch Assembler Task (single reader, assembles batches of 50)
    ↓
Channel<List<MessageContext>> (capacity = MaxParallelWriters × 2 = 8)
    ↓
4 Parallel Writer Tasks → Concurrent DB Writes
    ↓
Complete TaskCompletionSource → Ack message

Improvement: Multiple batches write to DB concurrently while transport continues dequeuing into larger buffer.

New Configuration Settings

Setting Default Range Description
AuditIngestionBatchSize 50 1-500 Messages per batch sent to DB
AuditIngestionMaxParallelWriters 4 1-16 Concurrent DB writer tasks
AuditIngestionBatchTimeout 100ms 10ms-5s Max wait time for partial batch to fill
Environment variables:
SERVICECONTROL_AUDIT_AuditIngestionBatchSize=50
SERVICECONTROL_AUDIT_AuditIngestionMaxParallelWriters=4
SERVICECONTROL_AUDIT_AuditIngestionBatchTimeout=00:00:00.100

Key Code Changes

1. Two-Channel Architecture

Before: Single channel from transport to consumer

readonly Channel<MessageContext> channel;

After: Two channels - messages and assembled batches

readonly Channel<MessageContext> messageChannel;      // Transport → Batch Assembler
readonly Channel<List<MessageContext>> batchChannel;  // Batch Assembler → Writers

2. Batch Assembler Task

New BatchAssemblerLoop that:

  • Reads individual messages from messageChannel
  • Assembles batches up to BatchSize
  • Waits up to BatchTimeout for partial batches to fill
  • Writes assembled batches to batchChannel

3. Parallel Writer Tasks

New WriterLoop (runs MaxParallelWriters instances) that:

  • Reads batches from batchChannel concurrently
  • Calls auditIngestor.Ingest() in parallel
  • Completes TaskCompletionSources on success/failure

4. Bug Fixes Applied

Issue Fix
SemaphoreSlim not disposed Added Dispose() override
Cancellation loses in-flight batch Track currentBatch and signal cancellation on shutdown
Task.Run with cancelled token throws immediately Pass CancellationToken.None to Task.Run

Throughput Analysis

Before

  • Transport concurrency: 32 (default)
  • Channel capacity: 32
  • DB writes: Sequential (1 at a time)
  • Effective throughput: 32 messages / DB_write_time

After

  • Transport concurrency: Configurable (e.g., 100)
  • Message channel capacity: 400 (50 × 4 × 2)
  • Batch channel capacity: 8 (4 × 2)
  • DB writes: 4 concurrent
  • Effective throughput: 4 × 50 messages / DB_write_time = 200 messages / DB_write_time
    Theoretical improvement: ~6x throughput (varies based on DB latency and transport speed)

Preserved Guarantees

Guarantee How It's Preserved
At-least-once delivery Each message's TaskCompletionSource is only completed after its batch persists
Message ordering Not required (audit messages are independent)
Back-pressure Bounded channels with FullMode.Wait
Graceful shutdown Drain channels, signal in-flight batches
Error isolation Failed batch only affects messages in that batch

Files Modified

  1. src/ServiceControl.Audit/Infrastructure/Settings/Settings.cs
    • Added 3 new configuration properties
    • Added validation methods for each setting
  2. src/ServiceControl.Audit/Auditing/AuditIngestion.cs
    • Complete refactor of ExecuteAsync into multi-task architecture
    • Added BatchAssemblerLoop method
    • Added WriterLoop method
    • Updated StopAsync for graceful multi-task shutdown
    • Added Dispose() override for SemaphoreSlim

DB Connection Impact

Before After
1 connection at a time Up to 4 concurrent connections

This is negligible impact on typical connection pools (default: 100 connections).

Introduces Azure Blob Storage as an alternative to file system storage for audit message bodies.

This change allows configuring Audit Persistence to store message bodies in Azure Blob Storage by providing a connection string, offering scalability and cost-effectiveness. It also adds compression for larger messages to optimize storage.

The existing file system storage remains an option if a path is configured.
Adds support for a dedicated connection string for message body storage.

This allows users to configure a separate database or storage account
specifically for storing large message bodies, potentially improving
performance and scalability.
Stores message bodies to disk in parallel to improve ingestion performance.

Instead of awaiting the completion of each write operation, it queues them,
allowing multiple write tasks to run concurrently.
It then awaits all tasks before saving the changes to the database.
Updates the configuration to no longer default the message body storage path to a location under `CommonApplicationData`. The path will now be empty by default. This change allows users to explicitly configure the storage location, preventing potential issues with default locations.
Refactors the Azure Blob Storage persistence to streamline its configuration.
It removes the direct instantiation of BlobContainerClient within the base class and
instead, registers the AzureBlobBodyStoragePersistence class for dependency injection,
allowing the constructor to handle the BlobContainerClient creation.

Additionally, it ensures that the ContentType metadata stored in Azure Blob Storage is properly encoded and decoded
to handle special characters.

Also, it adds MessageBodyStorageConnectionStringKey to the configuration keys for both PostgreSQL and SQL Server.
Implements data retention policy for audit messages and saga snapshots using a background service.

This change introduces a base `RetentionCleaner` class that handles the logic for deleting expired audit data in batches.  Database-specific implementations are provided for SQL Server and PostgreSQL, leveraging their respective locking mechanisms (sp_getapplock and advisory locks) to prevent concurrent executions of the cleanup process.

Removes the registration of the `RetentionCleaner` from the base class and registers it on specific implementations.

The cleanup process deletes processed messages and saga snapshots older than the configured retention period, optimizing database space and improving query performance.
Wraps retention cleanup process in an execution strategy
to handle transient database errors. Moves lock check
to inside the execution strategy, and only logs success
if the lock was acquired.
Resets the total deleted messages and snapshots counters,
as well as the lockAcquired flag, on each retry attempt of
the retention cleaner process. This prevents accumulation
of values across retries when the execution strategy is used.

Also, updates lock acquisition logic to use `AsAsyncEnumerable()`
to prevent errors caused by non-composable SQL in
`SqlQueryRaw` calls.
Adds metrics to monitor the retention cleanup process. This includes metrics for cleanup cycle duration, batch duration, deleted messages, skipped locks, and consecutive failures.

These metrics provide insights into the performance and health of the retention cleanup process, allowing for better monitoring and troubleshooting.
Introduces ingestion throttling during retention cleanup to reduce contention.

This change adds an `IngestionThrottleState` to manage the throttling. The retention cleaner now signals when cleanup starts and ends, and the audit ingestion process respects the current writer limit.

A new `RetentionCleanupBatchDelay` setting is introduced to add a delay between processing batches of messages.

Adds a capacity metric to monitor the current ingestion capacity.
Corrects an issue where endpoint reconciliation could lead to incorrect "LastSeen" values when endpoints are deleted and re-added.

The previous implementation aggregated LastSeen values across all deleted records, potentially resulting in an outdated value being used.

This change introduces a ranking mechanism to select the most recent LastSeen value for each KnownEndpointId during reconciliation. This ensures that the latest LastSeen value is used, improving the accuracy of endpoint activity tracking.
Ensures distinct message IDs are deleted during retention cleanup.

Adjusts the loop condition to continue deleting messages as long as the number of deleted items is greater than or equal to the batch size. This prevents premature termination of the cleanup process when a batch returns exactly the batch size, ensuring all eligible messages are removed.
Refactors the audit retention cleanup process to ensure reliability and prevent race conditions.

It achieves this by:
- Using session-level locks to maintain lock ownership across transactions, preventing premature lock release.
- Encapsulating the entire cleanup process within a single lock, simplifying retry logic and ensuring all operations are executed by the same instance.
- Wrapping each batch deletion in its own execution strategy and transaction to handle transient errors and maintain data consistency.
Implements batching of audit data for efficient retention cleanup.

Instead of processing individual messages, records are now grouped into batches
based on a configurable time interval. This approach significantly reduces the
number of database operations and improves cleanup performance, especially for
large datasets.

A new `BatchId` property is added to `ProcessedMessageEntity` and
`SagaSnapshotEntity` to facilitate batch management.  A background service
rotates the `BatchId` on a timer to define the batches.

The `IBodyStoragePersistence` interface has been updated to use `BatchId`
instead of message ids for deletion.

Retention cleanup now identifies complete batches where all messages are expired
and deletes them together with associated body files.
Uses a dedicated connection for retention cleanup locking to prevent lock release during DbContext connection retries.

Session-level locks are tied to the connection, and the DbContext's retry execution strategy can drop and reopen connections, silently releasing the lock. This change ensures the lock remains stable, even if the DbContext's connection is interrupted.
Refactors retention cleaner to use index seeks for identifying expired data.

This change enhances performance by efficiently querying for expired messages and snapshots, while also ensuring complete batch deletion.
Refactors retention cleaner to improve batch cleanup efficiency.

This change postpones batch deletion until after all expired messages have been deleted.  It then efficiently identifies and deletes only those batch folders that are truly empty. This avoids redundant existence checks during the main deletion loop.
Replaces the existing row-by-row deletion strategy with a batch-oriented approach, significantly improving the performance of audit retention cleanup.

This change introduces provider-specific batch delete SQL, eliminating the need to round-trip thousands of IDs as parameters. It leverages efficient index-only scans and server-side DELETE TOP operations to optimize lock acquisition and reduce contention with ingestion.
Refactors retention cleanup to use server-side batch operations
for deleting expired audit messages and retrieving affected batch IDs.
This avoids separate SELECT queries and reduces database round-trips,
improving performance for large datasets.
Refactors the audit retention cleaner to delete expired batches of messages and saga snapshots more efficiently.

Instead of deleting individual messages/snapshots and then cleaning up body storage, the cleaner now finds fully expired batches based on the maximum `ProcessedAt` timestamp, deletes body storage for these batches first, then deletes the messages/snapshots within those batches. This approach reduces the number of queries and improves performance.

Also, reduces the initial delay before the retention cleaner starts.
Reduces the number of composite indexes to improve write performance and simplify index maintenance.

Removes indexes that are no longer necessary based on query patterns, and adds indexes on BatchId and TimeSent for more efficient data retrieval.

This change reduces index redundancy and avoids the need to update indexes upon each write.
Increases the command timeout for database migrations to 5 minutes
to prevent failures due to long-running migrations. This change
addresses potential issues where the default timeout is insufficient,
especially in environments with large databases or slow network connections.
The original timeout is restored after the migration is complete.
Extends the database migration timeout for both PostgreSQL and SQL Server Audit databases from 5 minutes to 15 minutes.

This provides more time for the migration process to complete, especially in environments with large databases or slower performance, reducing the risk of timeouts during migration.
Restores the composite index on BatchId and ProcessedAt for both ProcessedMessages and SagaSnapshots tables.

This improves query performance for retention policies that rely on both batch ID and processing time, ensuring efficient data retrieval.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant