How to use Channel as an async queue in C#
By FoxLearn 3/21/2025 2:32:48 AM 39
This pattern consists of two parts:
- Multiple producers enqueueing items by calling
Channel.Writer.WriteAsync()
. - At least one consumer awaiting items to dequeue using
Channel.Reader.ReadAllAsync()
.
This is in contrast to BlockingCollection
, which is a blocking concurrent queue.
How to use a Channel
in C#
1. Create the Channel
The first step is to create a Channel
object. Here’s an example of creating an unbounded (no maximum capacity) Channel
that holds string
values:
using System.Threading.Channels; var channel = Channel.CreateUnbounded<string>();
Note: Use Channel.CreateBounded()
to create a bounded channel with a defined maximum capacity.
I recommend encapsulating the Channel
object within a class, which will be shown in the following steps.
2. Read from the Channel
To dequeue items asynchronously, you can use Channel.Reader.ReadAllAsync()
within a foreach
loop.
using System.Threading.Channels; using System.Threading.Tasks; public class MessageQueue { private Channel<string> channel; public MessageQueue() { channel = Channel.CreateUnbounded<string>(); } public async Task StartConsumer() { await foreach (var message in channel.Reader.ReadAllAsync()) { Console.WriteLine($"{DateTime.Now:hh:mm:ss} Got message: {message}"); } } }
In this example, asynchronous iteration is used with the IAsyncEnumerable
returned by ReadAllAsync()
. This allows the consumer to await the next item if the queue is empty and process the message when available.
3. Write to the Channel
You can enqueue items to the Channel
by calling Channel.Writer.WriteAsync()
.
using System.Threading.Channels; using System.Threading.Tasks; public class MessageQueue { private Channel<string> channel; public MessageQueue() { channel = Channel.CreateUnbounded<string>(); } public async Task StartConsumer() { await foreach (var message in channel.Reader.ReadAllAsync()) { Console.WriteLine($"{DateTime.Now:hh:mm:ss} Got message: {message}"); } } public async Task AddMessage(string message) { await channel.Writer.WriteAsync(message); } }
There are two key scenarios to be aware of when using WriteAsync()
:
- Bounded Channel: If you’re writing to a bounded
Channel
that has reached its capacity,WriteAsync()
will block until there is space available to write the item. - Closed Channel: Attempting to write to a closed
Channel
will result in aChannelClosedException
.
If you want to avoid blocking behavior, consider using TryWrite()
instead of WriteAsync()
. TryWrite()
will return false
if it’s unable to write the item immediately.
Note: A closed Channel
occurs when Writer.Complete()
is called, preventing further writes to the Channel
.
4. Running the Code and Adding Messages
This starts the consumer and then produces several messages:
var messageQueue = new MessageQueue(); // Start the consumer (don't await the task) var consumerTask = messageQueue.StartConsumer(); // Add some messages (producers) await messageQueue.AddMessage("Hello"); await messageQueue.AddMessage("This is an example of using Channel"); await messageQueue.AddMessage("Bye!"); // Await the consumer task to prevent the console app from closing prematurely await consumerTask;
This will output something like the following:
04:40:05 Got message: Hello world 04:40:05 Got message: This is an example of using Channel 04:40:05 Got message: Bye!
- Primitive types in C#
- How to set permissions for a directory in C#
- How to Convert Int to Byte Array in C#
- How to Convert string list to int list in C#
- How to convert timestamp to date in C#
- How to Get all files in a folder in C#
- Case sensitivity in JSON deserialization
- The JSON value could not be converted to Enum