How to batch read with Threading.ChannelReader in C#

By FoxLearn 3/13/2025 3:13:43 AM   9
In scenarios involving a consumer/producer model, batching the consumption of items can be highly beneficial. Whether you’re inserting a bulk of records into a database or sending multiple data packets over HTTP, sending individual items one by one can be inefficient.

On the other hand, waiting for an entire batch of items before processing is often impractical, especially when you want to start processing before a full batch is received.

This article demonstrates how to efficiently read and process a batch of items using the Threading.ChannelReader in a consumer/producer environment.

ChannelReader.ReadMultipleAsync() Extension for Batching

Suppose you need to handle batches of items, with each batch containing a maximum of 5 items. The goal is to wait asynchronously for at least one item to become available, and then keep reading until you've collected up to 5 items, or until the queue is empty.

To accomplish this, you shouldn’t use ChannelReader.ReadAllAsync() instead, you can combine WaitToReadAsync() with TryRead().

Here’s a custom extension method to batch read from a ChannelReader:

using System.Threading.Channels;
using System.Threading.Tasks;
using System.Collections.Generic;

public static class ChannelReaderExtensions
{
    public static async Task<List<T>> ReadMultipleAsync<T>(this ChannelReader<T> reader, int maxBatchSize, CancellationToken cancellationToken)
    {
        // Wait until at least one item is available
        await reader.WaitToReadAsync(cancellationToken);

        var batch = new List<T>();

        // Collect items until the batch reaches maxBatchSize
        while (batch.Count < maxBatchSize && reader.TryRead(out T message))
        {
            batch.Add(message);
        }

        return batch;
    }
}

This method waits for items to be available, then reads them into a batch. It doesn't check if the writer is completed, assuming the consumer will continuously process items.

Implementing ChannelReader.ReadMultipleAsync() in a Consumer Loop

In this example, we simulate a consumer loop that reads batches of items using the ReadMultipleAsync() extension:

// Create an unbounded message queue
var messageQueue = Channel.CreateUnbounded<int>();

public async Task ConsumerLoop(CancellationToken cancelToken)
{
    while (!cancelToken.IsCancellationRequested)
    {
        // Read a batch of up to 5 items
        var batch = await messageQueue.Reader.ReadMultipleAsync(maxBatchSize: 5, cancelToken);

        // Process the batch
        Console.WriteLine($"Processing batch: {string.Join(" ", batch)}");
        await ProcessBatch(batch);
        Console.WriteLine($"Finished processing batch: {string.Join(" ", batch)}");
        Console.WriteLine();
    }
}

Output:

If the items 1 through 12 are enqueued, the output would be:

Processing batch: 1 2 3 4 5
Finished processing batch: 1 2 3 4 5

Processing batch: 6 7 8 9 10
Finished processing batch: 6 7 8 9 10

Processing batch: 11 12
Finished processing batch: 11 12

Notice how the batches are processed in groups of 5, but the last batch contains only 2 items (a partial batch). The process doesn't wait for more items to arrive but continues to process partial batches if necessary.

Why ChannelReader.ReadAllAsync() Doesn’t Work for Batching

While ChannelReader.ReadAllAsync() is useful for reading all available items in a queue, it’s not designed for batch processing. It requires reading items one-by-one, and it can only process complete batches, not partial ones. You can’t control when a batch is processed ReadAllAsync() will only yield when it’s able to collect a full set of items.

Take a look at the code for ReadAllAsync():

public virtual async IAsyncEnumerable<T> ReadAllAsync([EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    while (await WaitToReadAsync(cancellationToken).ConfigureAwait(false))
    {
        while (TryRead(out T? item))
        {
            yield return item;
        }
    }
}

Here’s how it behaves when there’s only one item in the queue:

  1. WaitToReadAsync() returns true because there’s one item.
  2. TryRead() reads the item.
  3. The item is yielded to the calling code.
  4. TryRead() returns false since the queue is now empty.
  5. WaitToReadAsync() will wait for additional items to arrive before continuing.

Thus, ReadAllAsync() won't allow you to start processing the one item until more items are available, which makes it unsuitable for partial batch processing.

Consumer Loop with ReadAllAsync()

Here's a simple example of using ReadAllAsync() in a consumer loop.

public async Task ConsumerLoop(CancellationToken cancelToken)
{
    while (!cancelToken.IsCancellationRequested)
    {
        List<int> batch = new List<int>();

        await foreach (var message in messageQueue.Reader.ReadAllAsync(cancelToken))
        {
            batch.Add(message);

            // If the batch reaches size 5, process it
            if (batch.Count == 5)
            {
                Console.WriteLine($"Processing batch: {string.Join(" ", batch)}");
                await ProcessBatch(batch);
                Console.WriteLine($"Finished processing batch: {string.Join(" ", batch)}");
                Console.WriteLine();
            }
        }
    }
}

Output:

For the items 1 through 6, the output would be:

Adding 1 to batch
Adding 2 to batch
Adding 3 to batch
Adding 4 to batch
Adding 5 to batch
Processing batch: 1 2 3 4 5
Finished processing batch: 1 2 3 4 5

Adding 6 to batch
<Waiting indefinitely because there's no new item to complete the batch>

As you can see, after processing the first batch (1-5), the consumer waits for additional items to fill up the next batch. Since the queue doesn’t get more items, the consumer doesn’t proceed.

ChannelReader.ReadAllAsync() is great for processing either individual items or entire batches, but it isn't suitable for processing partial batches. To enable batch reading with more control, you can use a custom approach like ReadMultipleAsync(), which allows you to handle both full and partial batches without unnecessary delays.