Your Web News in One Place

Help Webnuz

Referal links:

Sign up for GreenGeeks web hosting
April 26, 2021 06:44 pm GMT

Scheduling tons of orchestrator functions concurrently in C

I had a call with an engineer on another team who wanted advice for how to schedule a large number of concurrent orchestration functions in Durable Functions. I shared with him some sample code for how we do it in our own internal performance tests and decided it might be useful to share publicly too.

First, here is the orchestration that we're running for the test. It's just a basic sequential orchestrator that calls a SayHello activity function 5 times.

[FunctionName(nameof(HelloSequence))]public static async Task<List<string>> HelloSequence(    [OrchestrationTrigger] IDurableOrchestrationContext context){    var outputs = new List<string>    {        await context.CallActivityAsync<string>(nameof(SayHello), "Tokyo"),        await context.CallActivityAsync<string>(nameof(SayHello), "Seattle"),        await context.CallActivityAsync<string>(nameof(SayHello), "London"),        await context.CallActivityAsync<string>(nameof(SayHello), "Amsterdam"),        await context.CallActivityAsync<string>(nameof(SayHello), "Mumbai")    };    return outputs;}[FunctionName(nameof(SayHello))]public static string SayHello([ActivityTrigger] string name) => $"Hello {name}!";

And here is the HTTP trigger function we use to trigger a performance run. It takes a count parameter from the query string as the number of concurrent "HelloSequence" orchestrations to run. In our tests, we'll often run more than 100K orchestrations concurrently.

[FunctionName(nameof(StartManySequences))]public static async Task<IActionResult> StartManySequences(    [HttpTrigger(AuthorizationLevel.Function, "post", Route = null)] HttpRequest req,    [DurableClient] IDurableClient starter,    ILogger log){    if (!int.TryParse(req.Query["count"], out int count) || count < 1)    {        return new BadRequestObjectResult("A 'count' query string parameter is required and it must contain a positive number.");    }    string prefix = await ScheduleManyInstances(starter, nameof(HelloSequence), count, log);    return new OkObjectResult($"Scheduled {count} orchestrations prefixed with '{prefix}'.");}

This method calls into a ScheduleManyInstances helper method, which does the actual scheduling. Before I get into our implementation, however, I think it would be useful to describe what not to do.

Nave implementation #1: Sequential

The most common nave implementation is to use a for-loop with an await in each iteration.

public static async Task<string> ScheduleManyInstances(    IDurableOrchestrationClient client,    string orchestrationName,    int count,    ILogger log){    log.LogWarning($"Scheduling {count} orchestration(s)...");    DateTime utcNow = DateTime.UtcNow;    string prefix = utcNow.ToString("yyyyMMdd-hhmmss");    for (int i = 0; i < count; i++)    {        // Start each instance one-at-a-time        string instanceId = $"{prefix}-{i:X16}";        await client.StartNewAsync(orchestrationName, instanceId);    }    log.LogWarning($"All {count} orchestrations were scheduled successfully!");    return prefix;}

The above is really slow because you're only enqueuing a single orchestration start message at a time. If you're scheduling a large number of orchestrations, then the client that invoked this HTTP function will probably time-out before all the orchestrations are scheduled. Ideally we'd schedule orchestrations in parallel, which leads us to the next bad practice.

Nave implementation #2: Too much parallelism

Next we try using Task.WhenAll to schedule all the orchestrations in parallel. This is better than scheduling orchestrations sequentially because it allows us to queue up new work much more quickly. However, it has a major scalability problem, which I'll describe below.

public static async Task<string> ScheduleManyInstances(    IDurableOrchestrationClient client,    string orchestrationName,    int count,    ILogger log){    log.LogWarning($"Scheduling {count} orchestration(s)...");    DateTime utcNow = DateTime.UtcNow;    string prefix = utcNow.ToString("yyyyMMdd-hhmmss");    // Run all StartNewAsync tasks concurrently    var startTasks = new Task[count];    for (int i = 0; i < count; i++)    {        string instanceId = $"{prefix}-{i:X16}";        startTasks[i] = client.StartNewAsync(orchestrationName, instanceId);    }    await Task.WhenAll(startTasks);    log.LogWarning($"All {count} orchestrations were scheduled successfully!");    return prefix;}

The problem is that if count is a large number (like 100K), you will very quickly exhaust both threads and outbound TCP connections on your VM. This is because the .NET thread scheduler will try to aggressively allocate a huge number of threads to satisfy your concurrency demands. Each of those threads will also try to open a connection to Azure Storage concurrently, requiring a new TCP connection. The result is often that the function will fail, making this implementation highly unreliable.

Nave implementation #3: Throttled Parallelism for Parallel.For

This next approach uses Parallel.For to use a throttled concurrency approach.

public static async Task<string> ScheduleManyInstances(    IDurableOrchestrationClient client,    string orchestrationName,    int count,    ILogger log){    log.LogWarning($"Scheduling {count} orchestration(s)...");    DateTime utcNow = DateTime.UtcNow;    string prefix = utcNow.ToString("yyyyMMdd-hhmmss");    // Use up to 200 threads to schedule orchestrations concurrently    var maxConcurrencyOptions = new ParallelOptions { MaxDegreeOfParallelism = 200 };    Parallel.For(0, count, maxConcurrencyOptions, i =>    {        string instanceId = $"{prefix}-{i:X16}";        // Use GetAwaiter().GetResult() to block since Parallel.For() doesn't support async        client.StartNewAsync(orchestrationName, instanceId).GetAwaiter().GetResult();    });    log.LogWarning($"All {count} orchestrations were scheduled successfully!");    return prefix;}

This works much better than the first two solutions. It fixes the TCP connection exhaustion issue by giving the system enough time to reuse existing TCP connections. It also addresses the thread starvation issue by ensuring we don't use more than 200 threads at the same time.

However, the Parallel.For solution is still inefficient because each degree of parallelism is occupying a dedicated thread, and those threads will get blocked waiting for the StartNewAsync call to complete. Threads are expensive in terms of CPU and memory and take time to allocate. Ideally we'd do this work in a non-blocking way that allows us to aggressively reuse threads.

Final implementation: Throttled parallelism

The solution we use is a variation of the above that provides much better thread reuse by using a fully async implementation. I defined a ParallelForEachAsync helper extension method to achieve this.

public static async Task<string> ScheduleManyInstances(    IDurableOrchestrationClient client,    string orchestrationName,    int count,    ILogger log){    log.LogWarning($"Scheduling {count} orchestration(s)...");    DateTime utcNow = DateTime.UtcNow;    string prefix = utcNow.ToString("yyyyMMdd-hhmmss");    await Enumerable.Range(0, count).ParallelForEachAsync(200, i =>    {        string instanceId = $"{prefix}-{i:X16}";        return client.StartNewAsync(orchestrationName, instanceId);    });    log.LogWarning($"All {count} orchestrations were scheduled successfully!");    return prefix;}

Here is the ParallelForEachAsync extension method implementation, which includes the async parallel throttling behavior.

public static async Task ParallelForEachAsync<T>(this IEnumerable<T> items, int maxConcurrency, Func<T, Task> action){    List<Task> tasks;    if (items is ICollection<T> itemCollection)    {        // optimization to reduce the number of memory allocations        tasks = new List<Task>(itemCollection.Count);    }    else    {        tasks = new List<Task>();    }    using var semaphore = new SemaphoreSlim(maxConcurrency);    foreach (T item in items)    {        tasks.Add(InvokeThrottledAction(item, action, semaphore));    }    await Task.WhenAll(tasks);}static async Task InvokeThrottledAction<T>(T item, Func<T, Task> action, SemaphoreSlim semaphore){    await semaphore.WaitAsync();    try    {        await action(item);    }    finally    {        semaphore.Release();    }}

As you can see, we use a SemaphoreSlim to ensure we don't execute more than maxConcurrency concurrent actions at the same time. Because the code path is fully async, we can now run many operations in parallel with a much smaller number of threads. Right now we have maxConcurrency set to 200, but it's very possible that a larger number could have worked as well.

This ParallelForEachAsync extension method is generic and can be used for any code that needs to execute asynchronous tasks with a cap on concurrency. You could even use it inside your orchestrator functions when scheduling activities or sub-orchestrations! This is useful, for example, if you need to throttle activity function concurrency in a distributed way to protect downstream resources, like databases, which can only handle a certain number of concurrent operations.

Anyways, I hope this is helpful for anyone doing performance work with Durable Functions. If there is a simple way to do this in .NET and I just didn't know about it, I'm interested in learning about that too!


Original Link: https://dev.to/cgillum/scheduling-tons-of-orchestrator-functions-concurrently-in-c-1ih7

Share this article:    Share on Facebook
View Full Article

Dev To

An online community for sharing and discovering great ideas, having debates, and making friends

More About this Source Visit Dev To