How to use asynchronous streams in C#

By FoxLearn 1/3/2025 3:43:25 AM   52
In C# 8.0, asynchronous programming is enhanced with the introduction of IAsyncEnumerable, allowing for the efficient consumption and creation of data streams asynchronously.

IAsyncEnumerable is similar to the familiar IEnumerable, except that it allows for asynchronous iteration. With IAsyncEnumerable, you can fetch data in an asynchronous manner, waiting for the next data point without blocking the current thread.

IAsyncDisposable, IAsyncEnumerable, and IAsyncEnumerator in C# 8.0

Asynchronous streams in C# allow you to consume data streams asynchronously. With the release of .NET Standard 2.1, three key interfaces IAsyncDisposable, IAsyncEnumerable, and IAsyncEnumerator were introduced to facilitate working with these streams.

public interface IAsyncDisposable
{
    ValueTask DisposeAsync();
}
public interface IAsyncEnumerable<out T>
{
    IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken
    token = default);
}
public interface IAsyncEnumerator<out T> : IAsyncDisposable
{
    ValueTask<bool> MoveNextAsync();
    T Current { get; }
}

Why use asynchronous streams?

When dealing with data retrieval from a data store, a simple method can fetch all results asynchronously and return them at once. However, if data needs to be paged, multiple calls are required, which can be inefficient.

Instead of using yield return with IEnumerable, which blocks the call, IAsyncEnumerable allows asynchronous streaming, enabling data to be sent back incrementally, improving scalability and performance.

Create an asynchronous stream in C# 8.0

To use this feature in Visual Studio 2019, follow these steps:

  1. Create a new .NET Core Console App project.
  2. Set the project to use C# 8.0 by adjusting the language version in the project settings.

In this case, we simulate fetching temperature data from various weather stations asynchronously:

using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace WeatherStationExample
{
    class Program
    {
        const int DELAY = 1500;  // Simulate network delay of 1.5 seconds
        const string[] Stations = { "Station A", "Station B", "Station C" };

        static async Task Main(string[] args)
        {
            await foreach (var temperature in GetTemperatureData())
            {
                Console.WriteLine(temperature);
            }
            Console.ReadLine();
        }

        static async IAsyncEnumerable<string> GetTemperatureData()
        {
            foreach (var station in Stations)
            {
                // Simulate fetching temperature data from the station asynchronously
                await Task.Delay(DELAY);  
                var temperature = $"Temperature at {station}: {GetRandomTemperature()}°C";
                yield return temperature;  // Yield data as soon as it's available
            }
        }

        static double GetRandomTemperature()
        {
            var random = new Random();
            return random.NextDouble() * 30 + 10;  // Random temperature between 10°C and 40°C
        }
    }
}

In this example:

  • GetTemperatureData: This method simulates fetching temperature data from multiple weather stations. Each weather station's data is fetched asynchronously, with a delay to simulate network latency.
  • IAsyncEnumerable: The method returns an asynchronous stream of temperature data. The await foreach loop in Main will consume the data as it is fetched, without waiting for all data to arrive first.
  • GetRandomTemperature: A helper function that generates a random temperature between 10°C and 40°C, simulating the data coming from each station.

In this example, the asynchronous stream allows the application to process each temperature reading as soon as it's available, improving performance when handling multiple asynchronous operations like fetching data from external sources.

IAsyncEnumerable, or asynchronous streams, is a key feature introduced in C# 8.0, enabling more efficient, cleaner, and higher-performing code for handling asynchronous data streams.