Skip to content

[SPIKE] Audits using EF#5278

Draft
johnsimons wants to merge 61 commits intomasterfrom
john/audit_ef
Draft

[SPIKE] Audits using EF#5278
johnsimons wants to merge 61 commits intomasterfrom
john/audit_ef

Conversation

@johnsimons
Copy link
Member

No description provided.

Refactors the upsert logic in several data stores to leverage EF Core's change tracking more efficiently.

Instead of creating a new entity and then calling Update, the code now fetches the existing entity (if any) and modifies its properties directly.
This reduces the overhead and potential issues associated with detached entities.

The RecoverabilityIngestionUnitOfWork is also updated to use change tracking for FailedMessageEntity updates.

This commit was made on the `john/more_interfaces` branch.
Adds data store and entities required for persisting licensing and throughput data.

This includes adding new tables for licensing metadata, throughput endpoints, and daily throughput data, as well as configurations and a data store implementation to interact with these tables.
Also added headers to the serialised entity
Updates data stores to utilize IServiceScopeFactory instead of IServiceProvider for creating database scopes.

This change improves dependency injection and resource management,
ensuring proper scope lifecycle management, especially for asynchronous operations.
Adds full-text search capabilities for error messages, allowing users to search within message headers and, optionally, the message body.

Introduces an interface for full-text search providers to abstract the database-specific implementation.

Stores small message bodies inline for faster retrieval and populates a searchable text field from headers and the message body.

Adds configuration option to set the maximum body size to store inline.
Ensures that the reconciliation process in the insert-only table reconciler is transactional by wrapping the batch reconciliation within a database transaction.

This prevents partial updates and maintains data consistency.

Refactors the KnownEndpointsReconciler to use a table variable to store deleted records before aggregation, improving performance and readability of the SQL MERGE statement.
@johnsimons johnsimons force-pushed the john/audit_ef branch 2 times, most recently from cfcf1c9 to 9106c1f Compare February 2, 2026 06:06
Refactors audit ingestion to use batching and concurrency for improved performance.
Introduces configurable batch size, parallel writers, and batch timeout settings.
This change optimizes the ingestion process by assembling messages into batches and processing them concurrently, leading to higher throughput and reduced latency.
Combines headers and body into a single `SearchableContent`
column for full-text indexing. This simplifies FTS queries and
improves search performance.

Also refactors `AuditIngestionUnitOfWork` to utilize the new
`SearchableContent` property and streamline body storage logic.

Removes obsolete migration files related to previous FTS
implementations.
Updates Entity Framework Core and related packages to the latest versions.

This ensures compatibility with the latest features and bug fixes in the EF Core ecosystem. It also addresses potential security vulnerabilities and improves overall performance.
Removes the MySQL persistence implementation due to its incomplete state and lack of audit support.
This simplifies the codebase and focuses resources on fully supported persistence options.

The related test projects and SQL persistence files have been removed.
Package versions are updated to align with current versions.
Introduces a setting to control whether message bodies are stored on disk.

This is useful for scenarios where disk space is a concern or message bodies
are not required for auditing purposes. It enhances configuration flexibility.
@johnsimons
Copy link
Member Author

johnsimons commented Feb 3, 2026

Audit Ingestion Parallel Processing Improvements

Executive Summary

Refactored the AuditIngestion class to support parallel database writes, significantly increasing throughput by decoupling transport message dequeuing from database persistence.


Architecture Comparison

Before: Sequential Processing

Transport (MaxConcurrency threads, e.g., 32)
    ↓
Channel<MessageContext> (capacity = MaxConcurrency = 32)
    ↓
Single Consumer (ExecuteAsync loop)
    ↓
Sequential DB Write (one batch at a time)
    ↓
Complete TaskCompletionSource → Ack message

Bottleneck: Single reader processes one batch at a time. All transport threads wait for DB write to complete before their messages are acknowledged.

After: Parallel Processing

Transport (MaxConcurrency threads, e.g., 100)
    ↓
Channel<MessageContext> (capacity = BatchSize × MaxParallelWriters × 2 = 400)
    ↓
Batch Assembler Task (single reader, assembles batches of 50)
    ↓
Channel<List<MessageContext>> (capacity = MaxParallelWriters × 2 = 8)
    ↓
4 Parallel Writer Tasks → Concurrent DB Writes
    ↓
Complete TaskCompletionSource → Ack message

Improvement: Multiple batches write to DB concurrently while transport continues dequeuing into larger buffer.

New Configuration Settings

Setting Default Range Description
AuditIngestionBatchSize 50 1-500 Messages per batch sent to DB
AuditIngestionMaxParallelWriters 4 1-16 Concurrent DB writer tasks
AuditIngestionBatchTimeout 100ms 10ms-5s Max wait time for partial batch to fill
Environment variables:
SERVICECONTROL_AUDIT_AuditIngestionBatchSize=50
SERVICECONTROL_AUDIT_AuditIngestionMaxParallelWriters=4
SERVICECONTROL_AUDIT_AuditIngestionBatchTimeout=00:00:00.100

Key Code Changes

1. Two-Channel Architecture

Before: Single channel from transport to consumer

readonly Channel<MessageContext> channel;

After: Two channels - messages and assembled batches

readonly Channel<MessageContext> messageChannel;      // Transport → Batch Assembler
readonly Channel<List<MessageContext>> batchChannel;  // Batch Assembler → Writers

2. Batch Assembler Task

New BatchAssemblerLoop that:

  • Reads individual messages from messageChannel
  • Assembles batches up to BatchSize
  • Waits up to BatchTimeout for partial batches to fill
  • Writes assembled batches to batchChannel

3. Parallel Writer Tasks

New WriterLoop (runs MaxParallelWriters instances) that:

  • Reads batches from batchChannel concurrently
  • Calls auditIngestor.Ingest() in parallel
  • Completes TaskCompletionSources on success/failure

4. Bug Fixes Applied

Issue Fix
SemaphoreSlim not disposed Added Dispose() override
Cancellation loses in-flight batch Track currentBatch and signal cancellation on shutdown
Task.Run with cancelled token throws immediately Pass CancellationToken.None to Task.Run

Throughput Analysis

Before

  • Transport concurrency: 32 (default)
  • Channel capacity: 32
  • DB writes: Sequential (1 at a time)
  • Effective throughput: 32 messages / DB_write_time

After

  • Transport concurrency: Configurable (e.g., 100)
  • Message channel capacity: 400 (50 × 4 × 2)
  • Batch channel capacity: 8 (4 × 2)
  • DB writes: 4 concurrent
  • Effective throughput: 4 × 50 messages / DB_write_time = 200 messages / DB_write_time
    Theoretical improvement: ~6x throughput (varies based on DB latency and transport speed)

Preserved Guarantees

Guarantee How It's Preserved
At-least-once delivery Each message's TaskCompletionSource is only completed after its batch persists
Message ordering Not required (audit messages are independent)
Back-pressure Bounded channels with FullMode.Wait
Graceful shutdown Drain channels, signal in-flight batches
Error isolation Failed batch only affects messages in that batch

Files Modified

  1. src/ServiceControl.Audit/Infrastructure/Settings/Settings.cs
    • Added 3 new configuration properties
    • Added validation methods for each setting
  2. src/ServiceControl.Audit/Auditing/AuditIngestion.cs
    • Complete refactor of ExecuteAsync into multi-task architecture
    • Added BatchAssemblerLoop method
    • Added WriterLoop method
    • Updated StopAsync for graceful multi-task shutdown
    • Added Dispose() override for SemaphoreSlim

DB Connection Impact

Before After
1 connection at a time Up to 4 concurrent connections

This is negligible impact on typical connection pools (default: 100 connections).

Introduces Azure Blob Storage as an alternative to file system storage for audit message bodies.

This change allows configuring Audit Persistence to store message bodies in Azure Blob Storage by providing a connection string, offering scalability and cost-effectiveness. It also adds compression for larger messages to optimize storage.

The existing file system storage remains an option if a path is configured.
Adds support for a dedicated connection string for message body storage.

This allows users to configure a separate database or storage account
specifically for storing large message bodies, potentially improving
performance and scalability.
Stores message bodies to disk in parallel to improve ingestion performance.

Instead of awaiting the completion of each write operation, it queues them,
allowing multiple write tasks to run concurrently.
It then awaits all tasks before saving the changes to the database.
Updates the configuration to no longer default the message body storage path to a location under `CommonApplicationData`. The path will now be empty by default. This change allows users to explicitly configure the storage location, preventing potential issues with default locations.
Refactors the Azure Blob Storage persistence to streamline its configuration.
It removes the direct instantiation of BlobContainerClient within the base class and
instead, registers the AzureBlobBodyStoragePersistence class for dependency injection,
allowing the constructor to handle the BlobContainerClient creation.

Additionally, it ensures that the ContentType metadata stored in Azure Blob Storage is properly encoded and decoded
to handle special characters.

Also, it adds MessageBodyStorageConnectionStringKey to the configuration keys for both PostgreSQL and SQL Server.
Implements data retention policy for audit messages and saga snapshots using a background service.

This change introduces a base `RetentionCleaner` class that handles the logic for deleting expired audit data in batches.  Database-specific implementations are provided for SQL Server and PostgreSQL, leveraging their respective locking mechanisms (sp_getapplock and advisory locks) to prevent concurrent executions of the cleanup process.

Removes the registration of the `RetentionCleaner` from the base class and registers it on specific implementations.

The cleanup process deletes processed messages and saga snapshots older than the configured retention period, optimizing database space and improving query performance.
Wraps retention cleanup process in an execution strategy
to handle transient database errors. Moves lock check
to inside the execution strategy, and only logs success
if the lock was acquired.
Resets the total deleted messages and snapshots counters,
as well as the lockAcquired flag, on each retry attempt of
the retention cleaner process. This prevents accumulation
of values across retries when the execution strategy is used.

Also, updates lock acquisition logic to use `AsAsyncEnumerable()`
to prevent errors caused by non-composable SQL in
`SqlQueryRaw` calls.
Adds metrics to monitor the retention cleanup process. This includes metrics for cleanup cycle duration, batch duration, deleted messages, skipped locks, and consecutive failures.

These metrics provide insights into the performance and health of the retention cleanup process, allowing for better monitoring and troubleshooting.
Introduces ingestion throttling during retention cleanup to reduce contention.

This change adds an `IngestionThrottleState` to manage the throttling. The retention cleaner now signals when cleanup starts and ends, and the audit ingestion process respects the current writer limit.

A new `RetentionCleanupBatchDelay` setting is introduced to add a delay between processing batches of messages.

Adds a capacity metric to monitor the current ingestion capacity.
Corrects an issue where endpoint reconciliation could lead to incorrect "LastSeen" values when endpoints are deleted and re-added.

The previous implementation aggregated LastSeen values across all deleted records, potentially resulting in an outdated value being used.

This change introduces a ranking mechanism to select the most recent LastSeen value for each KnownEndpointId during reconciliation. This ensures that the latest LastSeen value is used, improving the accuracy of endpoint activity tracking.
Ensures distinct message IDs are deleted during retention cleanup.

Adjusts the loop condition to continue deleting messages as long as the number of deleted items is greater than or equal to the batch size. This prevents premature termination of the cleanup process when a batch returns exactly the batch size, ensuring all eligible messages are removed.
Refactors the audit retention cleanup process to ensure reliability and prevent race conditions.

It achieves this by:
- Using session-level locks to maintain lock ownership across transactions, preventing premature lock release.
- Encapsulating the entire cleanup process within a single lock, simplifying retry logic and ensuring all operations are executed by the same instance.
- Wrapping each batch deletion in its own execution strategy and transaction to handle transient errors and maintain data consistency.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant