skip to Main Content

I’ve been playing around with telemetry sampling in order to avoid hitting App Insights daily logs cap. Ideally, I want to apply sampling to everything but exclude exceptions and preserve related traces (same operation id) for the exceptions.

I’ve created a sample console app to test things and so far I can successfully sample and preserve exceptions. But related traces get sampled as well.

I looked at implementing custom ITelemetryProcessor, but it processes one entry at a time. So I’m not sure if it is even possible with custom processor. Maybe there is something that help achieve desired behavior.

The Program.cs code is below

using Microsoft.ApplicationInsights;
using Microsoft.ApplicationInsights.Channel;
using Microsoft.ApplicationInsights.Extensibility;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

const string appInsightConnString = "<connection string>";
const double samplingPercentage = 50;

var services = new ServiceCollection();

// add context
var context = new Context();
services.AddSingleton(context);

// configure application insights
services.AddApplicationInsightsTelemetryWorkerService(
    (options) =>
    {
        // disable adaptive sampling
        options.EnableAdaptiveSampling = false;
        options.ConnectionString = appInsightConnString;
    });

// configure logging
services.AddLogging(loggingBuilder =>
{
    loggingBuilder.ClearProviders();
    loggingBuilder.Services.AddSingleton<ILoggerProvider, ContextApplicationInsightsLoggerProvider>();
    loggingBuilder.AddConsole();
});

var serviceProvider = services.BuildServiceProvider();

// setup sampling
var telemetryConfiguration = serviceProvider.GetRequiredService<TelemetryConfiguration>();
var telemetryBuilder = telemetryConfiguration.DefaultTelemetrySink.TelemetryProcessorChainBuilder;
telemetryBuilder.UseSampling(samplingPercentage, excludedTypes: "Exception");
telemetryBuilder.Build();

// get logger
var logger = serviceProvider.GetRequiredService<ILogger<Program>>();

// do something important
DoWork(context, logger);

// explicitly call Flush() followed by sleep is required in console apps.
// this is to ensure that even if application terminates, telemetry is sent to the back-end.
var telemetryClient = serviceProvider.GetRequiredService<TelemetryClient>();
telemetryClient.Flush();
Task.Delay(10000).Wait();

Console.WriteLine("Flushed. Press any key to exit");
Console.ReadKey();


static void DoWork(Context context, ILogger logger)
{
    const int iterations = 50;
    const int errors = 15;

    // session Id to filter logs
    var sessionId = Guid.NewGuid().ToString();
    Console.WriteLine($"Session Id: {sessionId}");

    // randomize errors
    var random = new Random();
    var errorsHash = new HashSet<int>();
    while (errorsHash.Count < errors)
    {
        errorsHash.Add(random.Next(0, iterations));
    }

    // log
    for (var i = 0; i < iterations; i++)
    {
        context.CorrelationId = Guid.NewGuid().ToString();
        logger.LogInformation($"Begin operation: {context.CorrelationId}. Session Id: {sessionId}");
        if (errorsHash.Contains(i))
            logger.LogError(new Exception("test ex"), $"Error operation: {context.CorrelationId}. Session Id: {sessionId}");
        logger.LogInformation($"End operation: {context.CorrelationId}. Session Id: {sessionId}");
    }
}

2

Answers


  1. Chosen as BEST ANSWER

    Just wanted to post what I've came up so far after looking more into implementing custom telemetry processor. The idea to keep accumulating buckets of telemetry for some time before making sampling decision. It creates a bit of lag, but that's the best I've got so far.

    using Microsoft.ApplicationInsights.Channel;
    using Microsoft.ApplicationInsights.DataContracts;
    using Microsoft.ApplicationInsights.Extensibility;
    using Microsoft.ApplicationInsights.WindowsServer.TelemetryChannel;
    using Microsoft.Extensions.Caching.Memory;
    using Microsoft.Extensions.Primitives;
    using System.Collections.Concurrent;
    using System.Diagnostics;
    
    /// <summary>
    /// Sampes only successful telemetry chains
    /// </summary>
    public class SuccessfulSamplingTelemetryProcessor : ITelemetryProcessor
    {
        private readonly TimeSpan _bufferedTime;
        private readonly ITelemetryProcessor _next;
        private readonly SamplingTelemetryProcessor _samplingTelemetryProcessor;
        private readonly MemoryCache _cache;
    
        public SuccessfulSamplingTelemetryProcessor(double successfulSamplingPercentage, TimeSpan bufferedTime, ITelemetryProcessor next)
        {
            _bufferedTime = bufferedTime;
            _next = next;
            _samplingTelemetryProcessor = new SamplingTelemetryProcessor(next)
            {
                SamplingPercentage = successfulSamplingPercentage
            };
            _cache = new MemoryCache(new MemoryCacheOptions());
        }
    
        public void Process(ITelemetry item)
        {
            // get operation id 
            var operationId = item.Context.Operation.Id;
            if (string.IsNullOrEmpty(operationId))
            {
                // sample by default without correlation id
                _samplingTelemetryProcessor.Process(item);
                return;
            }
    
            var queue = _cache.GetOrCreate(operationId, entry =>
            {
                // expiration doesnt work as I'd expected, so use cancellation token instead
                // see https://stackoverflow.com/questions/42535408/net-core-memorycache-postevictioncallback-not-working-properly
                var expirationToken = new CancellationChangeToken(new CancellationTokenSource(_bufferedTime).Token);
                entry.AbsoluteExpirationRelativeToNow = _bufferedTime;
                entry.Priority = CacheItemPriority.NeverRemove;
                entry.AddExpirationToken(expirationToken);
                entry.PostEvictionCallbacks.Add(new PostEvictionCallbackRegistration
                {
                    EvictionCallback = OnPostEviction
                });
                return new ConcurrentQueue<ITelemetry>();
            });
    
            // just add to the queue for now
            queue.Enqueue(item);
        }
    
        private void OnPostEviction(object key, object? value, EvictionReason reason, object? state)
        {
            var queue = (ConcurrentQueue<ITelemetry>)value;
    
            // check if there is exception in the chain 
            var hasException = queue.Any(t => t is ExceptionTelemetry);
            while (queue.TryDequeue(out var telemetry))
            {
                if (hasException)
                {
                    // pass through
                    _next.Process(telemetry);
                }
                else
                {
                    // apply sampling
                    _samplingTelemetryProcessor.Process(telemetry);
                }
            }   
        }
    }
    

  2. The code below uses Microsoft Application Insights for monitoring and logging within a console application, utilizing the ITelemetryProcessor interface from Application Insights. The RelatedTelemetryProcessor class implements the ITelemetryProcessor and preserve related entries along with excluded items when using Application Insights

    I referred to this doc for filtering and preprocessing in the Application Insights SDK Azure Monitor.

    • A custom telemetry processor (RelatedTelemetryProcessor) is used, which preserves all telemetry items related to exceptions based on their operation ID. Sampling is also employed while excluding exceptions (excludedTypes: "Exception") with the default telemetry processor chain.
       private const string AppInsightConnectionString = "<connection string>";
        private const double SamplingPercentage = 50;
    
        public static async Task Main(string[] args)
        {
            // Create a ServiceCollection
            var services = new ServiceCollection();
    
            // Configure Application Insights telemetry
            services.AddApplicationInsightsTelemetryWorkerService(options =>
            {
                options.EnableAdaptiveSampling = false;
                options.ConnectionString = AppInsightConnectionString;
            });
    
            // Configure logging
            services.AddLogging(loggingBuilder =>
            {
                loggingBuilder.AddConsole();
                loggingBuilder.AddApplicationInsights(AppInsightConnectionString);
            });
    
            // Build the service provider
            var serviceProvider = services.BuildServiceProvider();
    
            // Configure telemetry pipeline with custom processor and sampling
            var telemetryConfiguration = serviceProvider.GetRequiredService<TelemetryConfiguration>();
            var telemetryProcessorChainBuilder = telemetryConfiguration.DefaultTelemetrySink.TelemetryProcessorChainBuilder;
    
            // Use custom processor and sampling
            telemetryProcessorChainBuilder.Use((next) => new CustomTelemetryProcessor(next));
            telemetryProcessorChainBuilder.UseSampling(SamplingPercentage, excludedTypes: "Exception");
            telemetryProcessorChainBuilder.Build();
    
            // Get logger
            var logger = serviceProvider.GetRequiredService<ILogger<Program>>();
    
            // Perform work
            DoWork(logger);
    
            // Flush telemetry
            var telemetryClient = serviceProvider.GetRequiredService<TelemetryClient>();
            telemetryClient.Flush();
            await Task.Delay(5000); // Wait for telemetry to be sent
    
            Console.WriteLine("Done. Press any key to exit.");
            Console.ReadKey();
        }
    
        static void DoWork(ILogger logger)
        {
            const int iterations = 50;
            const int errors = 10;
    
            var random = new Random();
            var errorsSet = new HashSet<int>();
            while (errorsSet.Count < errors)
            {
                errorsSet.Add(random.Next(0, iterations));
            }
    
            // Perform operations with logging
            for (int i = 0; i < iterations; i++)
            {
                string operationId = Guid.NewGuid().ToString();
                logger.LogInformation($"Begin operation: {operationId}");
    
                if (errorsSet.Contains(i))
                {
                    logger.LogError(new Exception("Sample exception"), $"Error in operation: {operationId}");
                }
    
                logger.LogInformation($"End operation: {operationId}");
            }
        }
    }
    
    public class CustomTelemetryProcessor : ITelemetryProcessor
    {
        private readonly ITelemetryProcessor _next;
        private readonly HashSet<string> _preservedOperationIds = new HashSet<string>();
    
        public CustomTelemetryProcessor(ITelemetryProcessor next)
        {
            _next = next;
        }
    
        public void Process(ITelemetry item)
        {
            // Get the operation ID of the telemetry item
            string operationId = item.Context.Operation.Id;
    
            // Check if the telemetry item is an exception
            if (item is ExceptionTelemetry exceptionTelemetry)
            {
                // Add the operation ID to the set of preserved operation IDs
                _preservedOperationIds.Add(operationId);
            }
    
            // Check if the operation ID is in the set of preserved operation IDs
            if (_preservedOperationIds.Contains(operationId))
            {
                // Pass the item through without sampling
                _next.Process(item);
            }
            else
            {
                // Apply your desired sampling here
                // If you decide to keep the item, pass it to the next processor
                _next.Process(item);
            }
        }
    

    enter image description here

    To ensure that related traces are preserved when there is an exception while applying sampling to other telemetry items, you can adjust your custom telemetry processor to handle both cases properly.

    public class CustomTelemetryProcessor : ITelemetryProcessor
    {
        private ITelemetryProcessor _next;
        private HashSet<string> _preservedOperationIds = new HashSet<string>();
    
        public CustomTelemetryProcessor(ITelemetryProcessor next)
        {
            _next = next;
        }
    
        public void Process(ITelemetry item)
        {
            if (item is ExceptionTelemetry exceptionTelemetry)
            {
                // Add the operation ID to the set of preserved IDs
                _preservedOperationIds.Add(exceptionTelemetry.Context.Operation.Id);
            }
    
            // Check if the operation ID is in the preserved set
            if (_preservedOperationIds.Contains(item.Context.Operation.Id))
            {
                // Pass the item through without sampling
                _next.Process(item);
            }
            else
            {
                // Apply sampling or other processing as usual
                _next.Process(item);
            }
        }
    }
    

    In your Main method:

    // Configure telemetry pipeline with custom processor and sampling
    var telemetryConfiguration = serviceProvider.GetRequiredService<TelemetryConfiguration>();
    var telemetryProcessorChainBuilder = telemetryConfiguration.DefaultTelemetrySink.TelemetryProcessorChainBuilder;
    
    // Use custom processor and sampling
    telemetryProcessorChainBuilder.Use((next) => new CustomTelemetryProcessor(next));
    telemetryProcessorChainBuilder.UseSampling(SamplingPercentage, excludedTypes: "Exception");
    telemetryProcessorChainBuilder.Build();
    

    The custom ITelemetryProcessor processes one telemetry item at a time, which can make it challenging to ensure that related traces are preserved when exceptions occur.

    enter image description here

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search