How to use Channel as an async queue in C#

By FoxLearn 3/21/2025 2:32:48 AM   39
The Channel class from System.Threading.Channels provides a non-blocking, asynchronous queue that implements the producer-consumer pattern.

This pattern consists of two parts:

  1. Multiple producers enqueueing items by calling Channel.Writer.WriteAsync().
  2. At least one consumer awaiting items to dequeue using Channel.Reader.ReadAllAsync().

This is in contrast to BlockingCollection, which is a blocking concurrent queue.

How to use a Channel in C#

1. Create the Channel

The first step is to create a Channel object. Here’s an example of creating an unbounded (no maximum capacity) Channel that holds string values:

using System.Threading.Channels;

var channel = Channel.CreateUnbounded<string>();

Note: Use Channel.CreateBounded() to create a bounded channel with a defined maximum capacity.

I recommend encapsulating the Channel object within a class, which will be shown in the following steps.

2. Read from the Channel

To dequeue items asynchronously, you can use Channel.Reader.ReadAllAsync() within a foreach loop.

using System.Threading.Channels;
using System.Threading.Tasks;

public class MessageQueue
{
    private Channel<string> channel;
    
    public MessageQueue()
    {
        channel = Channel.CreateUnbounded<string>();
    }

    public async Task StartConsumer()
    { 
        await foreach (var message in channel.Reader.ReadAllAsync())
        {
            Console.WriteLine($"{DateTime.Now:hh:mm:ss} Got message: {message}");
        }
    }
}

In this example, asynchronous iteration is used with the IAsyncEnumerable returned by ReadAllAsync(). This allows the consumer to await the next item if the queue is empty and process the message when available.

3. Write to the Channel

You can enqueue items to the Channel by calling Channel.Writer.WriteAsync().

using System.Threading.Channels;
using System.Threading.Tasks;

public class MessageQueue
{
    private Channel<string> channel;
    
    public MessageQueue()
    {
        channel = Channel.CreateUnbounded<string>();
    }

    public async Task StartConsumer()
    { 
        await foreach (var message in channel.Reader.ReadAllAsync())
        {
            Console.WriteLine($"{DateTime.Now:hh:mm:ss} Got message: {message}");
        }
    }

    public async Task AddMessage(string message)
    {
        await channel.Writer.WriteAsync(message);
    }
}

There are two key scenarios to be aware of when using WriteAsync():

  • Bounded Channel: If you’re writing to a bounded Channel that has reached its capacity, WriteAsync() will block until there is space available to write the item.
  • Closed Channel: Attempting to write to a closed Channel will result in a ChannelClosedException.

If you want to avoid blocking behavior, consider using TryWrite() instead of WriteAsync(). TryWrite() will return false if it’s unable to write the item immediately.

Note: A closed Channel occurs when Writer.Complete() is called, preventing further writes to the Channel.

4. Running the Code and Adding Messages

This starts the consumer and then produces several messages:

var messageQueue = new MessageQueue();

// Start the consumer (don't await the task)
var consumerTask = messageQueue.StartConsumer(); 

// Add some messages (producers)
await messageQueue.AddMessage("Hello");
await messageQueue.AddMessage("This is an example of using Channel");
await messageQueue.AddMessage("Bye!");

// Await the consumer task to prevent the console app from closing prematurely
await consumerTask;

This will output something like the following:

04:40:05 Got message: Hello world
04:40:05 Got message: This is an example of using Channel
04:40:05 Got message: Bye!