![Domain Events and Pub/Sub Pattern]()
Domain events and the publish/subscribe pattern are essential for building decoupled, scalable, and maintainable systems. Whether we're working on a microservices architecture or a modular monolith, these patterns help us manage complexity and enforce boundaries between components.
Let's explore two approaches to implementing domain events and pub/sub:
- Manual Implementation: A custom-built event publisher.
- MediatR: A popular library for implementing the mediator pattern.
What Are Domain Events?
An event is something that has happened in the past. A domain event is something that happened in the domain that you want other parts of the same domain (in-process) to be aware of.
Common Use Cases
- Cache Updates: When an entity is updated, we may need to invalidate or update a cache.
- Full-Text Search Indexing Update, Auditing & Logging: When a product is updated, trigger an event to update the search index (e.g., Elasticsearch). Also, we can capture events for audit trails or log analytics.
- Publishing to a Message Broker: Domain events can be published to a message broker for processing by other services.
- Event Sourcing: Domain events can be stored in an event store to reconstruct the state of an entity at any point in time.
Approach 1. Manual Implementation
✅ Pros
- Full Control: We have complete control over how events are published and how consumers are resolved.
- Performance: Avoids the overhead of reflection and dynamic dispatch.
- No External Dependencies: Lightweight and easy to understand.
❌ Cons
- Boilerplate Code: Requires more code to set up and maintain.
- Limited Features: No built-in support for pipelines, behaviors, or cross-cutting concerns.
Approach 2. MediatR
MediatR is a popular library that simplifies the implementation of the mediator pattern, which is closely related to pub/sub.
✅ Pros
- Simplicity: Easy to set up and use.
- Built-In Features: Supports pipelines, behaviors, and cross-cutting concerns.
- Rapid Development: Great for rapid prototyping.
❌ Cons
- Overhead: Adds some overhead due to reflection and dynamic dispatch.
- Less Flexibility: Limited control over how events are published and handlers are resolved.
Implementations
1. Consumers (Subscribers) of Domain Events
public partial interface IConsumer<T>
{
Task HandleEventAsync(T eventMessage,CancellationToken cancellationToken = default);
}
// Consumers
public class ProductEventConsumer :
IConsumer<ProductUpdatedEvent>
// Optionaly ProductInsert and ProductUpdate
{
public async Task HandleEventAsync(ProductUpdatedEvent eventMessage, CancellationToken cancellationToken = default)
{
// Implementation
}
}
public class ProductCacheUpdateConsumer :
IConsumer<ProductUpdatedEvent>
{
public async Task HandleEventAsync(ProductUpdatedEvent eventMessage, CancellationToken cancellationToken = default)
{
// Implementation
}
}
2. Event Publisher Interfaces And class
public partial interface IEventPublisher
{
Task PublishAsync<TEvent>(TEvent @event,CancellationToken cancellationToken = default);
}
public partial class EventPublisher : IEventPublisher
{
private readonly IServiceProvider _serviceProvider;
private readonly ConcurrentDictionary<Type, List<object>> _consumerCache = new();
public EventPublisher(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
//to preload consumers during start up (perfomarnce optimization)
PreloadConsumers();
}
/// <returns>A task that represents the asynchronous operation</returns>
public virtual async Task PublishAsync<TEvent>(TEvent @event,CancellationToken cancellationToken = default)
{
if (_consumerCache.TryGetValue(typeof(TEvent), out var consumers))
{
foreach (var consumer in consumers.OfType<IConsumer<TEvent>>())
{
try
{
await consumer.HandleEventAsync(@event, cancellationToken);
}
catch (Exception ex)
{
// Handle errors
}
}
}
}
private void PreloadConsumers()
{
using var scope = _serviceProvider.CreateScope();
var serviceProvider = scope.ServiceProvider;
// Discover all concrete IConsumer<T> implementations
var consumerTypes = AppDomain.CurrentDomain.GetAssemblies()
.SelectMany(a => a.GetTypes())
.Where(t => t.IsClass && !t.IsAbstract && // Ensure it's a concrete class
t.GetInterfaces()
.Any(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IConsumer<>)))
.ToList(); // Cache the results to avoid repeated enumeration
foreach (var consumerType in consumerTypes)
{
// Get the event type (T) from IConsumer<T>
var eventType = consumerType.GetInterfaces()
.First(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IConsumer<>))
.GetGenericArguments()[0];
// Resolve the consumer using its interface (IConsumer<T>)
var consumerInterface = typeof(IConsumer<>).MakeGenericType(eventType);
var consumer = serviceProvider.GetRequiredService(consumerInterface);
// Add the consumer to the cache
var consumerList = _consumerCache.GetOrAdd(eventType, _ => new List<object>());
consumerList.Add(consumer);
}
}
3. Implementing MediatR (to compare benchmark with Manual one)
public partial class ProductUpdatedEvent(Product product) : INotification
{
// Properties based required logic
}
public class ProductUpdatedEventHandler : INotificationHandler<ProductUpdatedEventMediatr>
{
public async Task Handle(ProductUpdatedEventMediatr notification
, CancellationToken cancellationToken)
{
//any use case (i already mentioned in the content)
// Full Text search indexing update (eg ElasticSearch) , Auditing , Logging
// Publishing to message broker
// Storing as eventstore for event sourcing
// Syncing with external services and etc
}
}
4. Registring in DI and Usage
// Register Custom Publisher
services.AddTransient<IConsumer<ProductUpdatedEvent>, ProductEventConsumer>();
services.AddTransient<IConsumer<ProductUpdatedEvent>, ProductCacheUpdateConsumer>();
services.AddSingleton<IEventPublisher, EventPublisher>();
// Register MediatR
services.AddMediatR(cfg => cfg.RegisterServicesFromAssemblyContaining<Program>());
//Usage
await _customEventPublisher.PublishAsync(new ProductUpdatedEvent(product));
await _mediator.Publish(new ProductUpdatedEvent(product));
5. Benchmark
![Summary]()
Thanks for reading.