How to use BlockingCollection in C#
By FoxLearn 12/21/2024 4:08:03 AM 22
It provides a thread-safe implementation for managing a producer-consumer pattern, where multiple threads (producers) add items to a collection, and a single thread (consumer) takes those items for processing.
This article explains how BlockingCollection
works, and compares it with other options like ConcurrentQueue
and Channel
.
In a blocking concurrent queue like BlockingCollection
, the consumer thread blocks (or waits) until an item becomes available for processing. This behavior allows efficient synchronization between producers and consumers in a multithreaded environment.
Consider this example where we have one consumer and two producer threads interacting with a BlockingCollection<string>
.
using System.Collections.Concurrent; using System.Threading.Tasks; var blockingQueue = new BlockingCollection<string>(); // Consumer Task.Run(() => { while (!blockingQueue.IsCompleted) { var message = blockingQueue.Take(); // Blocks until a new message is available Console.WriteLine($"{DateTime.Now:hh:mm:ss} Message={message}"); } }); // Multiple Producers Task.Run(async () => { while (true) { blockingQueue.Add("A"); await Task.Delay(TimeSpan.FromSeconds(1)); } }); Task.Run(async () => { while (true) { blockingQueue.Add("B"); await Task.Delay(TimeSpan.FromSeconds(3)); } }); Console.ReadLine();
Output:
10:56:22 Message=B 10:56:22 Message=A 10:56:23 Message=A 10:56:24 Message=A 10:56:25 Message=B 10:56:25 Message=A
The consumer waits for a message to be available in the queue by calling Take()
, which blocks until an item is available.
Two producer threads add items ("A" and "B") at different intervals.
As shown, the consumer processes messages as they arrive, with no risk of concurrency issues due to the thread-safe nature of BlockingCollection
.
BlockingCollection vs ConcurrentQueue
While BlockingCollection
uses ConcurrentQueue
internally, it provides additional functionality to support blocking operations.
Feature | BlockingCollection Method | ConcurrentQueue |
---|---|---|
Blocking dequeue operation | Take() | Not available |
Bounded queue capacity | BlockingCollection(int boundedCapacity) | Not available |
Completed queue (no more enqueues) | CompleteAdding() | Not available |
BlockingCollection
extends ConcurrentQueue
by adding important features such as blocking dequeue operations, bounded capacity, and the ability to mark the queue as completed, preventing further additions.
If blocking indefinitely is not desirable, TryTake()
allows you to attempt to dequeue an item within a specified timeout. If no item is available within the given time frame, it returns false
.
Here’s an example using TryTake()
with a timeout:
var blockingQueue = new BlockingCollection<string>(); Task.Run(() => { while (!blockingQueue.IsCompleted) { Console.WriteLine($"{DateTime.Now:hh:mm:ss} Waiting for message"); if (blockingQueue.TryTake(out var message, TimeSpan.FromSeconds(30))) { Console.WriteLine($"{DateTime.Now:hh:mm:ss} {message}"); } else { Console.WriteLine($"{DateTime.Now:hh:mm:ss} No message in last 30 seconds"); } } });
Output:
10:56:56 Waiting for message 10:56:56 A 10:56:56 Waiting for message 10:56:56 B 10:56:56 Waiting for message 10:56:57 A 10:56:57 Waiting for message 10:56:58 A 10:56:58 Waiting for message
In this case, the consumer checks every 30 seconds for a message. If no message is found, it outputs a "No message in last 30 seconds" message. If a message is found, it proceeds to print it.
If you need the ability to cancel the operation gracefully, you can integrate a CancellationToken
with BlockingCollection
. This is especially useful when you want to stop processing when certain conditions are met, without risking a deadlock.
By using BlockingCollection
, developers can focus on high-level logic without having to manage low-level synchronization details, streamlining the development process and enhancing the reliability of concurrent systems.
- Calculating the Distance Between Two Coordinates in C#
- Could Not Find an Implementation of the Query Pattern
- Fixing Invalid Parameter Type in Attribute Constructor
- Objects added to a BindingSource’s list must all be of the same type
- How to use dictionary with tuples in C#
- How to convert a dictionary to a list in C#
- Dictionary with multiple values per key in C#
- Sock Merchant Problem Solution