How to use BlockingCollection in C#

By FoxLearn 12/21/2024 4:08:03 AM   122
The BlockingCollection class in .NET is a powerful tool designed to simplify working with blocking, concurrent queues.

It provides a thread-safe implementation for managing a producer-consumer pattern, where multiple threads (producers) add items to a collection, and a single thread (consumer) takes those items for processing.

This article explains how BlockingCollection works, and compares it with other options like ConcurrentQueue and Channel.

In a blocking concurrent queue like BlockingCollection, the consumer thread blocks (or waits) until an item becomes available for processing. This behavior allows efficient synchronization between producers and consumers in a multithreaded environment.

Consider this example where we have one consumer and two producer threads interacting with a BlockingCollection<string>.

using System.Collections.Concurrent;
using System.Threading.Tasks;

var blockingQueue = new BlockingCollection<string>();

// Consumer
Task.Run(() =>
{
    while (!blockingQueue.IsCompleted)
    {
        var message = blockingQueue.Take(); // Blocks until a new message is available
        Console.WriteLine($"{DateTime.Now:hh:mm:ss} Message={message}");
    }
});

// Multiple Producers
Task.Run(async () =>
{
    while (true)
    {
        blockingQueue.Add("A");
        await Task.Delay(TimeSpan.FromSeconds(1));
    }
});

Task.Run(async () =>
{
    while (true)
    {
        blockingQueue.Add("B");
        await Task.Delay(TimeSpan.FromSeconds(3));
    }
});

Console.ReadLine();

Output:

10:56:22 Message=B
10:56:22 Message=A
10:56:23 Message=A
10:56:24 Message=A
10:56:25 Message=B
10:56:25 Message=A

The consumer waits for a message to be available in the queue by calling Take(), which blocks until an item is available.

Two producer threads add items ("A" and "B") at different intervals.

As shown, the consumer processes messages as they arrive, with no risk of concurrency issues due to the thread-safe nature of BlockingCollection.

BlockingCollection vs ConcurrentQueue

While BlockingCollection uses ConcurrentQueue internally, it provides additional functionality to support blocking operations.

FeatureBlockingCollection MethodConcurrentQueue
Blocking dequeue operationTake()Not available
Bounded queue capacityBlockingCollection(int boundedCapacity)Not available
Completed queue (no more enqueues)CompleteAdding()Not available

BlockingCollection extends ConcurrentQueue by adding important features such as blocking dequeue operations, bounded capacity, and the ability to mark the queue as completed, preventing further additions.

If blocking indefinitely is not desirable, TryTake() allows you to attempt to dequeue an item within a specified timeout. If no item is available within the given time frame, it returns false.

Here’s an example using TryTake() with a timeout:

var blockingQueue = new BlockingCollection<string>();

Task.Run(() =>
{
    while (!blockingQueue.IsCompleted)
    {
        Console.WriteLine($"{DateTime.Now:hh:mm:ss} Waiting for message");
        if (blockingQueue.TryTake(out var message, TimeSpan.FromSeconds(30)))
        {
            Console.WriteLine($"{DateTime.Now:hh:mm:ss} {message}");
        }
        else
        {
            Console.WriteLine($"{DateTime.Now:hh:mm:ss} No message in last 30 seconds");
        }
    }
});

Output:

10:56:56 Waiting for message
10:56:56 A
10:56:56 Waiting for message
10:56:56 B
10:56:56 Waiting for message
10:56:57 A
10:56:57 Waiting for message
10:56:58 A
10:56:58 Waiting for message

In this case, the consumer checks every 30 seconds for a message. If no message is found, it outputs a "No message in last 30 seconds" message. If a message is found, it proceeds to print it.

If you need the ability to cancel the operation gracefully, you can integrate a CancellationToken with BlockingCollection. This is especially useful when you want to stop processing when certain conditions are met, without risking a deadlock.

By using BlockingCollection, developers can focus on high-level logic without having to manage low-level synchronization details, streamlining the development process and enhancing the reliability of concurrent systems.