Domain Events & Pub/Sub Pattern: Manual Implementation vs MediatR

Domain Events and Pub/Sub Pattern

Domain events and the publish/subscribe pattern are essential for building decoupled, scalable, and maintainable systems. Whether we're working on a microservices architecture or a modular monolith, these patterns help us manage complexity and enforce boundaries between components.

Let's explore two approaches to implementing domain events and pub/sub:

  • Manual Implementation: A custom-built event publisher.
  • MediatR: A popular library for implementing the mediator pattern.

What Are Domain Events?

An event is something that has happened in the past. A domain event is something that happened in the domain that you want other parts of the same domain (in-process) to be aware of.

Common Use Cases

  • Cache Updates: When an entity is updated, we may need to invalidate or update a cache.
  • Full-Text Search Indexing Update, Auditing & Logging: When a product is updated, trigger an event to update the search index (e.g., Elasticsearch). Also, we can capture events for audit trails or log analytics.
  • Publishing to a Message Broker: Domain events can be published to a message broker for processing by other services.
  • Event Sourcing: Domain events can be stored in an event store to reconstruct the state of an entity at any point in time.

Approach 1. Manual Implementation

✅ Pros

  • Full Control: We have complete control over how events are published and how consumers are resolved.
  • Performance: Avoids the overhead of reflection and dynamic dispatch.
  • No External Dependencies: Lightweight and easy to understand.

❌ Cons

  • Boilerplate Code: Requires more code to set up and maintain.
  • Limited Features: No built-in support for pipelines, behaviors, or cross-cutting concerns.

Approach 2. MediatR

MediatR is a popular library that simplifies the implementation of the mediator pattern, which is closely related to pub/sub.

✅ Pros

  • Simplicity: Easy to set up and use.
  • Built-In Features: Supports pipelines, behaviors, and cross-cutting concerns.
  • Rapid Development: Great for rapid prototyping.

❌ Cons

  • Overhead: Adds some overhead due to reflection and dynamic dispatch.
  • Less Flexibility: Limited control over how events are published and handlers are resolved.

Implementations

1. Consumers (Subscribers) of Domain Events

public partial interface IConsumer<T>
{
    Task HandleEventAsync(T eventMessage,CancellationToken cancellationToken = default);
}

// Consumers
public class ProductEventConsumer  :
IConsumer<ProductUpdatedEvent>
	// Optionaly  ProductInsert and ProductUpdate

{
	public async Task HandleEventAsync(ProductUpdatedEvent eventMessage, CancellationToken cancellationToken = default)
	{
		// Implementation
	}
}
public class ProductCacheUpdateConsumer :
	 IConsumer<ProductUpdatedEvent>
	{
		public async Task HandleEventAsync(ProductUpdatedEvent eventMessage, CancellationToken cancellationToken = default)
		{
			// Implementation
		}
	}

2. Event Publisher Interfaces And class

public partial interface IEventPublisher
{  
    Task PublishAsync<TEvent>(TEvent @event,CancellationToken cancellationToken = default);
}

public partial class EventPublisher : IEventPublisher
{
    private readonly IServiceProvider _serviceProvider;
	private readonly ConcurrentDictionary<Type, List<object>> _consumerCache = new();

	public EventPublisher(IServiceProvider serviceProvider)
	{
       _serviceProvider = serviceProvider;
        //to preload consumers during start up (perfomarnce optimization)
		PreloadConsumers();
	}

	/// <returns>A task that represents the asynchronous operation</returns>
	public virtual async Task PublishAsync<TEvent>(TEvent @event,CancellationToken cancellationToken = default)
    {
		if (_consumerCache.TryGetValue(typeof(TEvent), out var consumers))
		{
			foreach (var consumer in consumers.OfType<IConsumer<TEvent>>())
			{
				try
				{
					await consumer.HandleEventAsync(@event, cancellationToken);
				}
				catch (Exception ex)
				{
					// Handle errors
				}
			}
		}
	}
	private void PreloadConsumers()
	{
		using var scope = _serviceProvider.CreateScope();
		var serviceProvider = scope.ServiceProvider;

		// Discover all concrete IConsumer<T> implementations
		var consumerTypes = AppDomain.CurrentDomain.GetAssemblies()
			.SelectMany(a => a.GetTypes())
			.Where(t => t.IsClass && !t.IsAbstract && // Ensure it's a concrete class
						t.GetInterfaces()
							.Any(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IConsumer<>)))
			.ToList(); // Cache the results to avoid repeated enumeration

		foreach (var consumerType in consumerTypes)
		{
			// Get the event type (T) from IConsumer<T>
			var eventType = consumerType.GetInterfaces()
				.First(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IConsumer<>))
				.GetGenericArguments()[0];

			// Resolve the consumer using its interface (IConsumer<T>)
			var consumerInterface = typeof(IConsumer<>).MakeGenericType(eventType);
			var consumer = serviceProvider.GetRequiredService(consumerInterface);

			// Add the consumer to the cache
			var consumerList = _consumerCache.GetOrAdd(eventType, _ => new List<object>());
			consumerList.Add(consumer);
		}
	}

3. Implementing MediatR (to compare benchmark with Manual one)

public partial class ProductUpdatedEvent(Product product) : INotification
{
  // Properties based required logic
}

public class ProductUpdatedEventHandler : INotificationHandler<ProductUpdatedEventMediatr>
{
	public async Task Handle(ProductUpdatedEventMediatr notification
		, CancellationToken cancellationToken)
	{
		//any use case (i already mentioned in the content)

        // Full Text search indexing update (eg ElasticSearch) , Auditing , Logging
        // Publishing to message broker
        // Storing as eventstore for event sourcing
        // Syncing with external services and etc
	}
}

4. Registring in DI and Usage

// Register Custom Publisher
services.AddTransient<IConsumer<ProductUpdatedEvent>, ProductEventConsumer>();
services.AddTransient<IConsumer<ProductUpdatedEvent>, ProductCacheUpdateConsumer>();

services.AddSingleton<IEventPublisher, EventPublisher>();

// Register MediatR
services.AddMediatR(cfg => cfg.RegisterServicesFromAssemblyContaining<Program>());

//Usage
await _customEventPublisher.PublishAsync(new ProductUpdatedEvent(product));
await _mediator.Publish(new ProductUpdatedEvent(product));

5. Benchmark

Summary

Thanks for reading.

Up Next
    Ebook Download
    View all
    Learn
    View all