Async SSE endpoint in ASP.NET Core

By FoxLearn 3/4/2025 9:11:19 AM   50
Server-Sent Events (SSE) allow a client to receive real-time updates from the server. Unlike WebSockets, SSE is a simpler alternative that works over HTTP and is ideal for one-way communication, such as notifications.

In this article, I will show how to implement a real-time notification system using an async SSE endpoint that listens for notifications from a queue and delivers them to connected clients.

We will implement a Web API with an async SSE notification endpoint that does the following:

  • Clients subscribe to receive notifications.
  • Notifications are added to a queue.
  • The server asynchronously dequeues and sends notifications to subscribed clients.

Creating an Async SSE Notification Endpoint

The first step is to create an SSE endpoint where clients can subscribe and receive real-time notifications.

[ApiController]
public class NotificationController : ControllerBase
{
    private readonly INotificationQueue _notificationQueue;

    public NotificationController(INotificationQueue notificationQueue)
    {
        _notificationQueue = notificationQueue;
    }

    [HttpGet]
    [Route("notifications/subscribe/{userId}")]
    public async Task Subscribe(string userId)
    {
        Response.ContentType = "text/event-stream";
        Response.StatusCode = 200;

        using var streamWriter = new StreamWriter(Response.Body);
        
        _notificationQueue.Register(userId);

        try
        {
            await foreach (var notification in _notificationQueue.DequeueAsync(userId, HttpContext.RequestAborted))
            {
                await streamWriter.WriteLineAsync($"data: {notification}");
                await streamWriter.FlushAsync();
            }
        }
        catch (OperationCanceledException)
        {
            // Expected when the client disconnects
        }
        finally
        {
            _notificationQueue.Unregister(userId);
        }
    }
}

Adding an Endpoint to Post Notifications

We need an endpoint that allows sending notifications to subscribed clients.

[HttpPost]
[Route("notifications/{userId}")]
public async Task<IActionResult> PostNotification(string userId, string message)
{
    try
    {
        await _notificationQueue.EnqueueAsync(userId, message, HttpContext.RequestAborted);
        return Ok();
    }
    catch (Exception ex)
    {
        return BadRequest(ex.Message);
    }
}

In this example:

  • This endpoint accepts notifications and adds them to the queue.
  • If a client is subscribed, the notification is sent in real-time.

Creating an Async Notification Queue

1. Define an Interface

public interface INotificationQueue
{
    void Register(string userId);
    void Unregister(string userId);
    IAsyncEnumerable<string> DequeueAsync(string userId, CancellationToken cancelToken);
    Task EnqueueAsync(string userId, string message, CancellationToken cancelToken);
}

2. Implementing the NotificationQueue

public class NotificationQueue : INotificationQueue
{
    private readonly ConcurrentDictionary<string, Channel<string>> _subscribers = new();

    public void Register(string userId)
    {
        _subscribers.TryAdd(userId, Channel.CreateUnbounded<string>());
    }

    public void Unregister(string userId)
    {
        _subscribers.TryRemove(userId, out _);
    }

    public async Task EnqueueAsync(string userId, string message, CancellationToken cancelToken)
    {
        if (_subscribers.TryGetValue(userId, out var channel))
        {
            await channel.Writer.WriteAsync(message, cancelToken);
        }
    }

    public IAsyncEnumerable<string> DequeueAsync(string userId, CancellationToken cancelToken)
    {
        if (_subscribers.TryGetValue(userId, out var channel))
        {
            return channel.Reader.ReadAllAsync(cancelToken);
        }
        throw new ArgumentException($"User {userId} isn't registered.");
    }
}

In this example:

  • Registers subscribers using ConcurrentDictionary<string, Channel<string>>.
  • Uses channels to store notifications asynchronously.
  • Subscribers receive real-time notifications via the DequeueAsync method.

Register INotificationQueue in Startup

To inject INotificationQueue into NotificationController, register it in Startup.cs:

public class Startup
{
    public void ConfigureServices(IServiceCollection services)
    {
        services.AddSingleton<INotificationQueue, NotificationQueue>();
        services.AddControllers();
    }
}

This ensures that all instances of NotificationQueue share the same singleton instance across the application.

Testing the Real-Time Notification System

Subscribe Using Curl

Open a terminal and run:

curl -N http://localhost:5000/notifications/subscribe/user123

This will keep the connection open, listening for notifications.

Send Notifications Using Curl

curl -X POST "http://localhost:5000/notifications/user123" -H "Content-Type: application/json" -d '"Hello, User!"'

The subscribed client will immediately receive:

data: Hello, User!

This implementation demonstrates how to use ASP.NET Core to build an async SSE notification endpoint. By using Channel<T>, we efficiently manage real-time notifications without blocking threads.