skip to Main Content

I am having very bad experiencie with durable functions, So the case is that I have the following code:

public async Task RunOrchestrator([OrchestrationTrigger] TaskOrchestrationContext context, string input)
{         
  int limit = 200;
  int offset = 0;
  bool isLastPage = false;         

  var options = TaskOptions.FromRetryPolicy(new RetryPolicy(
          maxNumberOfAttempts: 3,
          firstRetryInterval: TimeSpan.FromSeconds(5)));

  while (!isLastPage)
  {
      try
      {
          var batchResult = await context.CallActivityAsync<BatchResult>("FetchZaraProductsBatch", new BatchRequest
          {
              Limit = limit,
              Offset = offset,
              Query = input,
              Section = input
          }, options: options);

          isLastPage = batchResult.IsLastPage;
          offset = batchResult.lastPickedPosition;

          ReportStatus(context, batchResult.ClothingItems.Count, offset, input);
      }
      catch (Exception ex)
      {
          _logger.LogError($"Failed to fetch batch after retries. Error: {ex.Message}");
          throw;
      }
  }
}

enter image description here

So that durable function should handle x invokes of the activity function, and yet after it is deployed to cloud it triggers for example one run per 5 minutes of one invoke and then it runs 6 runs, and then it stops, so it doesn’t even run for a 5 minutes. My second question is how can I increase timeout to 10 minutes, I tried with host.json but it doesnt work.

2

Answers


  1. Chosen as BEST ANSWER

    I have one more question

    try
    {
        var batchResult = await context.CallActivityAsync<BatchResultZara>("FetchZaraProductsBatch", new BatchRequest
        {
            Limit = limit,
            Offset = input.Offset,
            Query = input.Section,
            Section = input.Section,
            CorrelationId = context.InstanceId
        }, options: options);
    
        isLastPage = batchResult.IsLastPage;
        input.Offset = batchResult.LastPickedPosition;
    
        totalClothes = await context.CallActivityAsync<int>("AccumulateTotalClothes", new AccumulateRequest
        {
            PreviousTotal = totalClothes,
            BatchTotal = batchResult.ItemsSearched
        });
    
        if (!isLastPage)
        {
            context.ContinueAsNew(input); // This is the important bit.
        }
    }
    catch (Exception ex)
    {
        logger.LogError($"Failed to fetch batch after retries. Error: {ex.Message}");
        throw;
    }
    
    // After all batches are processed, log via an activity function to ensure it's called once
    await context.CallActivityAsync("LogFinalProcessingResults", new ProcessingResultInput
    {
        Section = input.Section,
        TotalClothes = totalClothes
    });
    

    I want to accumulate all imported results and log only once at the end of durable but right now it seems that it is logged every time, how to achive that? @Andrew B


  2. First I’ll describe some things that you seem to have misunderstood, so it’s easier to understand. And then at the end I’ll give you an example of how you could re-write your code.

    Problems with your current approach

    There are several problems.

    Some may contribute to the problem you’re seeing, while others will cause your Function to fail in different ways. But they all need addressing. I’ll explain later in more detail, but this is the summary:

    • You can’t have an infinite while loop in a Durable Function Orchestration. The replay history will eventually fill up and break the Durable Orchestration entirely.^1
    • Every Activity must return the same result every time, for a given set of parameters, during a given Orchestration run. Your infinite while loop means that it’s one continuous orchestration run. The Orchestration may choose to re-run the same Activity call with the same parameters… And when it does this, it expects the same result^2. You haven’t shown your Activity code, but I suspect it’s non-deterministic; i.e. it can return different results each time you call it. This will cause lots of strange problems.
    • Your ReportStatus method needs to be inside another Activity Function. I’ll explain below, but this is because the Orchestration effectively executes each code line more than once (the "replay" effect) meaning that it will call ReportStatus repeatedly with the same values. It will not behave how you expect.
    • Relying on a longer timeout is just putting off the problem until later. If your pattern isn’t resilient to scaling for larger workloads, then you’ll still hit the 10 minute timeout. Even the Premium plan is called "unbounded" but is practically targetting 60 minutes^6. The good news is that you can usually re-structure Durable code so that it works fine even with a default 5 minute timeout.

    In other words there’s nothing wrong with Durable Functions here. But rather, you need to re-think how to use them.

    What will help you understand

    You’ll understand the above problems better if you read up on how Durable Orchestrations use their replay history.^3

    A brief precis on how the Durable Functions work: they physically exit from memory each time you await an Activity. When they resume after an Activity, they actually begin right from the start again. This is hard to get your head around, because you’d think "that can’t be right, because it seems to carry on from just after the await?"

    But the magic is: they check the replay history to see which parts they have already completed. They can therefore skip any Activities that alreayd happened, so it feels like they have resumed from where they left off.

    How you can fix your code

    I can see two possible approaches:

    1. Keep your approach of retrieving a page (i.e. 200) at a time, but use the Eternal Orchestrations pattern^4 instead of the while loop.
    2. Switch to having one function that retrieves the full list of work items (let’s say there are 10,000) and then processes them with parallelisation, using the fan-out/fan-in pattern.^5

    I’ll give a brief re-write of your code for each of those two approaches.

    Note that I’ve made some assumptions and simplified the example so it’s easier to see the key points (e.g. I removed the try..catch and the retry logic.)

    1. Using Eternal Orchestrations

    We can remove your unsafe while loop, and instead use the ContinueAsNew method. This causes the Orchestration to repeat in a safe way, passing the new offset each time:

    public async Task RunOrchestrator([OrchestrationTrigger] IDurableOrchestrationContext context, string input, int offset = 0)
    {         
        bool isLastPage = false;         
    
        var batchResult = await context.CallActivityAsync<BatchResult>("FetchZaraProductsBatch", new BatchRequest
        {
            Limit = 200,
            Offset = offset,
            Query = input,
            Section = input
        };
    
        isLastPage = batchResult.IsLastPage;
        offset = batchResult.lastPickedPosition;
    
        await context.CallActivityAsync("ReportStatus" /*...etc...*/);
    
        if (!isLastPage)
        {
            context.ContinueAsNew(input, offset); // This is the important bit.
        }
    }
    

    2. Using Fan-Out/Fan-In

    You haven’t shown what your FetchZaraProductsBatch is doing, so it’s hard to say if this is the right approach. But let’s assume your process is something like:

    1. Retrieve a list of all the Zara products
    2. For each one, process it

    Your code at the moment would therefore be processing 200 products in each batch, but only sequentially, i.e. one at a time. Let’s assume this takes 10 seconds to process each Product. Your Activity Function would take 2,000 seconds, which would cause a timeout. The process would fail.

    A better approach might be fan-out/fan-in. You’d retrieve the full list of Products (let’s say there are 10,000 of them) and process each one in its own Activity, but benefitting from a natural degree of parallelisation.

    public async Task RunOrchestrator([OrchestrationTrigger] IDurableOrchestrationContext context, string input, int offset = 0)
    {
        ZaraProduct[] itemsToProcess = await context.CallActivityAsync<BatchResult>("FetchZaraProducts_ButDontProcessThem", input);
    
        var asParallelTasks = itemsToProcess
            .Select(item => context.CallActivityAsync<ZaraProduct>("ProcessSingleZaraProduct_AndReportStatus", item))
            .ToArray();
        
        await Task.WhenAll(asParallelTasks);
    }
    

    Note that the Orchestration will stop once it’s processed all the current products. You would also need to re-trigger the Orchestration at some point later; either with a timer, or even combining this approach with the Eternal Orchestration pattern above.

    3. Combining Eternal Orchestration with Fan-Out/Fan-In

    You don’t give enough information about the use-case, so I’m having to guess what you want. But let’s say you want the process to be:

    1. Get all the products (like the Fan example)
    2. Process them all (also the Fan example)
    3. Wait 5 minutes
    4. Run again to process any further products (like the Eternal example)

    …then the code might look like this:

    public async Task RunOrchestrator([OrchestrationTrigger] IDurableOrchestrationContext context, string input, int offset = 0)
    {
        // Fan-Out/Fan-In
        ZaraProduct[] itemsToProcess = await context.CallActivityAsync<BatchResult>("FetchZaraProducts_ButDontProcessThem", input);
    
        var asParallelTasks = itemsToProcess
            .Select(item => context.CallActivityAsync<ZaraProduct>("ProcessSingleZaraProduct_AndReportStatus", item))
            .ToArray();
        
        await Task.WhenAll(asParallelTasks);
    
        // Wait 5 minutes.
        DateTime nextCleanup = context.CurrentUtcDateTime.AddMinutes(5);
        await context.CreateTimer(nextCleanup, CancellationToken.None);
    
        // Repeat Eternally
        context.ContinueAsNew(input);
    }
    

    Hope that helps!

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search