How to batch read with Threading.ChannelReader in C#
By FoxLearn 3/13/2025 3:13:43 AM 9
On the other hand, waiting for an entire batch of items before processing is often impractical, especially when you want to start processing before a full batch is received.
This article demonstrates how to efficiently read and process a batch of items using the Threading.ChannelReader
in a consumer/producer environment.
ChannelReader.ReadMultipleAsync()
Extension for Batching
Suppose you need to handle batches of items, with each batch containing a maximum of 5 items. The goal is to wait asynchronously for at least one item to become available, and then keep reading until you've collected up to 5 items, or until the queue is empty.
To accomplish this, you shouldn’t use ChannelReader.ReadAllAsync()
instead, you can combine WaitToReadAsync()
with TryRead()
.
Here’s a custom extension method to batch read from a ChannelReader
:
using System.Threading.Channels; using System.Threading.Tasks; using System.Collections.Generic; public static class ChannelReaderExtensions { public static async Task<List<T>> ReadMultipleAsync<T>(this ChannelReader<T> reader, int maxBatchSize, CancellationToken cancellationToken) { // Wait until at least one item is available await reader.WaitToReadAsync(cancellationToken); var batch = new List<T>(); // Collect items until the batch reaches maxBatchSize while (batch.Count < maxBatchSize && reader.TryRead(out T message)) { batch.Add(message); } return batch; } }
This method waits for items to be available, then reads them into a batch. It doesn't check if the writer is completed, assuming the consumer will continuously process items.
Implementing ChannelReader.ReadMultipleAsync()
in a Consumer Loop
In this example, we simulate a consumer loop that reads batches of items using the ReadMultipleAsync()
extension:
// Create an unbounded message queue var messageQueue = Channel.CreateUnbounded<int>(); public async Task ConsumerLoop(CancellationToken cancelToken) { while (!cancelToken.IsCancellationRequested) { // Read a batch of up to 5 items var batch = await messageQueue.Reader.ReadMultipleAsync(maxBatchSize: 5, cancelToken); // Process the batch Console.WriteLine($"Processing batch: {string.Join(" ", batch)}"); await ProcessBatch(batch); Console.WriteLine($"Finished processing batch: {string.Join(" ", batch)}"); Console.WriteLine(); } }
Output:
If the items 1
through 12
are enqueued, the output would be:
Processing batch: 1 2 3 4 5 Finished processing batch: 1 2 3 4 5 Processing batch: 6 7 8 9 10 Finished processing batch: 6 7 8 9 10 Processing batch: 11 12 Finished processing batch: 11 12
Notice how the batches are processed in groups of 5, but the last batch contains only 2 items (a partial batch). The process doesn't wait for more items to arrive but continues to process partial batches if necessary.
Why ChannelReader.ReadAllAsync()
Doesn’t Work for Batching
While ChannelReader.ReadAllAsync()
is useful for reading all available items in a queue, it’s not designed for batch processing. It requires reading items one-by-one, and it can only process complete batches, not partial ones. You can’t control when a batch is processed ReadAllAsync()
will only yield when it’s able to collect a full set of items.
Take a look at the code for ReadAllAsync()
:
public virtual async IAsyncEnumerable<T> ReadAllAsync([EnumeratorCancellation] CancellationToken cancellationToken = default) { while (await WaitToReadAsync(cancellationToken).ConfigureAwait(false)) { while (TryRead(out T? item)) { yield return item; } } }
Here’s how it behaves when there’s only one item in the queue:
WaitToReadAsync()
returns true because there’s one item.TryRead()
reads the item.- The item is yielded to the calling code.
TryRead()
returns false since the queue is now empty.WaitToReadAsync()
will wait for additional items to arrive before continuing.
Thus, ReadAllAsync()
won't allow you to start processing the one item until more items are available, which makes it unsuitable for partial batch processing.
Consumer Loop with ReadAllAsync()
Here's a simple example of using ReadAllAsync()
in a consumer loop.
public async Task ConsumerLoop(CancellationToken cancelToken) { while (!cancelToken.IsCancellationRequested) { List<int> batch = new List<int>(); await foreach (var message in messageQueue.Reader.ReadAllAsync(cancelToken)) { batch.Add(message); // If the batch reaches size 5, process it if (batch.Count == 5) { Console.WriteLine($"Processing batch: {string.Join(" ", batch)}"); await ProcessBatch(batch); Console.WriteLine($"Finished processing batch: {string.Join(" ", batch)}"); Console.WriteLine(); } } } }
Output:
For the items 1
through 6
, the output would be:
Adding 1 to batch Adding 2 to batch Adding 3 to batch Adding 4 to batch Adding 5 to batch Processing batch: 1 2 3 4 5 Finished processing batch: 1 2 3 4 5 Adding 6 to batch <Waiting indefinitely because there's no new item to complete the batch>
As you can see, after processing the first batch (1-5), the consumer waits for additional items to fill up the next batch. Since the queue doesn’t get more items, the consumer doesn’t proceed.
ChannelReader.ReadAllAsync()
is great for processing either individual items or entire batches, but it isn't suitable for processing partial batches. To enable batch reading with more control, you can use a custom approach like ReadMultipleAsync()
, which allows you to handle both full and partial batches without unnecessary delays.
- How to ignore JSON deserialization errors in C#
- JsonException: A possible object cycle was detected
- JSON value could not be converted to System.String in C#
- Calling ‘BuildServiceProvider’ from application code results in an additional copy of singleton services being created
- How to use Newtonsoft in ASP.NET Core
- How to use Polly In C#
- Global exception event handlers in C#
- How to Add or overwrite a value in ConcurrentDictionary in C#