Author blog addresses:
https://www.whuanle.cn
https://www.cnblogs.com/whuanle
.NET Advanced Development Series Tutorial:
https://docs.whuanle.cn/zh/maomi_framework
Event Bus
An event bus is often used in software development as a tool to decouple communication between components. In the development process of domain-driven design, since a system contains multiple modules, to avoid direct function calls between modules, an event bus can be used as a medium for inter-module communication, decoupling dependencies between modules.
In this chapter, we will learn how to write an event bus framework. This article references many design ideas of the event bus from the MASA Framework.
Using the Event Bus
Before writing an event bus framework, let's first understand how to use Maomi.EventBus. The sample code refers to the Demo8.Console project.
Create a project, then introduce the Maomi.EventBus package through NuGet.
Here we simulate a user registration process. Suppose that after a user submits information, the system processing flow is: check the verification code, write the user information to the Users table, initialize user data, and send an email.
First, define an event model class. The model class must inherit from the Event abstract class or the IEvent interface.
public record class MyEvent : Event
{
public string Name { get; set; }
public string EMail { get; set; }
public override string ToString()
{
return $"用户名: {Name} ,邮箱: {EMail}";
}
}
Next, write the event executors. In Maomi.EventBus, an event executor is a method rather than a type. There are no naming constraints for the executor method; the only requirement is that the method parameter contains the MyEvent event. The type where the executor method resides does not need to inherit from any interface; it only needs to be marked with the EventAttribute attribute.
First, check whether the user’s verification code is correct:
[Event] // Marks that this type contains event handlers
public class CheckImageCodeEventHandler
{
// Marks this method as an event handler and sets the execution order
[EventHandler(Order = 0)]
public void Check(MyEvent @event)
{
Console.WriteLine(@event.ToString());
}
}
When the framework scans and finds that CheckImageCodeEventHandler has the Event marker, it will be automatically registered in the container with a scoped lifetime. Since CheckImageCodeEventHandler is registered in the container, other services can be injected into its constructor.
[EventHandler(Order = 0)] defines the execution order of the executor after the event is triggered. Each event can have multiple executors, and they have an order among them, sorted by the EventHandlerAttribute.Order property. A single type can contain multiple executor methods, which can subscribe to the same event or different events. Each executor method has a corresponding cancel executor with the same order, used to roll back previous operations.
After writing the verification code executor method, we continue to improve the user registration process. The complete code example is as follows:
[Event]
public class UserRegisterEventHandler
{
[EventHandler(Order = 1)]
public void InsertDb(MyEvent @event)
{
var state = new Random().Next(0, 2);
if (state == 0)
Console.WriteLine("√ User information has been added to the database");
else throw new Exception("× Failed to write user information to the database");
}
[EventHandler(Order = 1, IsCancel = true)]
public void CancelInsertDb(MyEvent @event)
{
Console.WriteLine("Registration failed, refresh the verification code");
}
[EventHandler(Order = 2)]
public void InitUser(MyEvent @event)
{
var state = new Random().Next(0, 2);
if (state == 0)
Console.WriteLine("√ User data initialized, system generated default user permissions and data");
else throw new Exception("× Failed to initialize user data");
}
[EventHandler(Order = 2, IsCancel = true)]
public void CancelInitUser(MyEvent @event)
{
Console.WriteLine("Revoke user registration information");
}
[EventHandler(Order = 3)]
public void SendEmail(MyEvent @event)
{
var state = new Random().Next(0, 2);
if (state == 0)
Console.WriteLine("√ Verification email sent successfully");
else throw new Exception("× Failed to send verification email");
}
[EventHandler(Order = 3, IsCancel = true)]
public void CancelSendEmail(MyEvent @event)
{
Console.WriteLine("× Revoke initialized user data");
}
}
As you can see, each executor method has its own execution order, and each has a corresponding cancel executor. You only need to mark IsCancel = true. Executors and cancel executors can also be placed in different types. If an error occurs when executing SendEmail, the framework will execute the cancel executors one by one in the order: CancelSendEmail, CancelInitUser, CancelInsertDb.
We can also configure middleware to intercept events, record event logs, and determine whether the event can be executed.
public class LoggingMiddleware<TEvent> : IEventMiddleware<TEvent> where TEvent : IEvent
{
public async Task HandleAsync(TEvent @event, EventHandlerDelegate next)
{
Console.WriteLine("----- Handling command {0} ({1})", @event.GetType().Name, @event.ToString());
await next();
}
}
Middleware interceptors can determine whether an event should continue executing. If the event should proceed, use await next();. You can also wrap await next(); with a database transaction so that all database operations of the event can be rolled back together. This reduces the amount of manual cancel code and increases reliability, preventing rollback failures caused by exceptions or application crashes during the cancellation process.
Using the event bus is very simple. You only need to register the .AddEventBus() service, and then publish events using the IEventBus service.
static async Task Main()
{
var ioc = new ServiceCollection();
ioc.AddEventBus(middleware: typeof(LoggingMiddleware<>));
ioc.AddLogging(build => build.AddConsole());
var services = ioc.BuildServiceProvider();
var eventBus = services.GetRequiredService<IEventBus>();
await eventBus.PublishAsync(new MyEvent()
{
Name = "工良",
EMail = "工良@maomi.com"
});
}
Event Bus Design
The Maomi.EventBus project file structure is as follows:
// Event model interface
IEvent.cs
// Event model abstract implementation
Event.cs
// Event bus interface
IEventBus.cs
// Event bus implementation
EventBus.cs
// Extension functions
EventBusExtensions.cs
// Attribute markers
EventHandlerAttribute.cs
// Records event executor information
EventInfo.cs
// Event middleware
IEventMiddleware.cs
// Expression tree builder
InvokeBuilder.cs
Interface Abstraction
First is the simplest event model. Each event must inherit from this interface so that it can be recorded in logs, allowing quick location during debugging and troubleshooting.
// Event interface, used to pass parameters through events
public interface IEvent
{
// Unique event identifier
Guid GetEventId();
void SetEventId(Guid eventId);
DateTime GetCreationTime();
void SetCreationTime(DateTime creationTime);
}
// Simplified implementation of events, used to pass parameters through events
public abstract record Event : IEvent
{
private Guid _eventId;
private DateTime _creationTime;
protected Event() : this(Guid.NewGuid(), DateTime.UtcNow) { }
protected Event(Guid eventId, DateTime creationTime)
{
_eventId = eventId;
_creationTime = creationTime;
}
public Guid GetEventId() => _eventId;
public void SetEventId(Guid eventId) => _eventId = eventId;
public DateTime GetCreationTime() => _creationTime;
public void SetCreationTime(DateTime creationTime) => _creationTime = creationTime;
}
Through the event model, the basic format of an event is defined. Users can extend the event model by inheriting from IEvent or Event.
Next are two executor markers, used to mark types and methods respectively. The purpose of EventAttribute is simply to tell the program that this class contains event executors. EventHandlerAttribute marks that this method is an event executor, which event it should bind to, and its execution order.
// Marks that a class contains event executors
[AttributeUsage(AttributeTargets.Class, AllowMultiple = false, Inherited = false)]
public class EventAttribute : Attribute { }
// Marks that a method is an event executor
[AttributeUsage(AttributeTargets.Method, AllowMultiple = false, Inherited = false)]
public class EventHandlerAttribute : Attribute
{
// Event order
public int Order { get; set; } = 0;
// Whether it is a cancel event
public bool IsCancel { get; set; } = false;
}
Executor Encapsulation
Since we use a non-interface inheritance approach to implement event handling, executor methods do not have fixed names or parameter formats, making them more flexible. Therefore, the framework must solve how to identify and invoke these methods.
Executor methods may have the following four forms:
// Synchronous method
void My(MyEvent data)
// Asynchronous method
Task My(MyEvent data)
// Synchronous with cancellation
void My(MyEvent data, CancellationToken cancellationToken)
// Asynchronous with cancellation
Task My(MyEvent data, CancellationToken cancellationToken)
Since there are four forms of executor methods, how does the framework scan, recognize, and register them when the application starts? And after an event is triggered, how should the framework invoke them?
First, we can divide these four cases into synchronous and asynchronous methods, then wrap them using two delegates. object target represents the event type, and params object?[] parameters represents the parameters.
// Define function formats that support both asynchronous and non-asynchronous executors
internal delegate Task TaskInvokeDelegate(object target, params object?[] parameters);
internal delegate void VoidInvokeDelegate(object target, object?[] parameters);
Two void My() functions will be wrapped by expression trees into the void VoidInvokeDelegate() delegate, while two Task My() methods will be wrapped into the Task TaskInvokeDelegate delegate. Although two delegates are defined, to unify the format we ultimately convert VoidInvokeDelegate into TaskInvokeDelegate. Therefore, the framework only needs to uniformly execute asynchronous methods using the Task TaskInvokeDelegate() delegate.
Create a type InvokeBuilder.cs that encapsulates methods using expression trees to convert the four executor method forms and the VoidInvokeDelegate into a TaskInvokeDelegate.
For example, the following method:
public class CheckImageCodeEventHandler
{
public void Check(MyEvent @event)
{
}
}
Using an expression tree, it can be constructed as the following pseudo-code:
VoidInvokeDelegate invoke = (object target,object[]? parameters) =>
{
(CheckImageCodeEventHandler)target.Check(parameters);
}
TaskInvokeDelegate taskInvoke = Task (object target,object[]? parameters) =>
{
invoke.Invoke(target,paramters);
return Task.CompletedTask;
}
The complete code for InvokeBuilder is as follows:
internal static class InvokeBuilder
{
// Build delegate
public static TaskInvokeDelegate Build(MethodInfo methodInfo, Type targetType)
{
// Step 1: Build parameters (object target, params object?[] parameters)
// Construct executor method parameters
var targetParameter = Expression.Parameter(typeof(object), "target");
var parametersParameter = Expression.Parameter(typeof(object?[]), "parameters");
// Build function parameter list
var parameters = new List<Expression>();
var paramInfos = methodInfo.GetParameters();
for (var i = 0; i < paramInfos.Length; i++)
{
var paramInfo = paramInfos[i];
var valueObj = Expression.ArrayIndex(parametersParameter, Expression.Constant(i));
var valueCast = Expression.Convert(valueObj, paramInfo.ParameterType);
parameters.Add(valueCast);
}
// Construct method call
var instanceCast = Expression.Convert(targetParameter, targetType);
var methodCall = Expression.Call(instanceCast, methodInfo, parameters);
// Step 2: Convert to TaskInvokeDelegate form
if (methodCall.Type == typeof(void))
{
var lambda = Expression.Lambda<VoidInvokeDelegate>(methodCall, targetParameter, parametersParameter);
var voidExecutor = lambda.Compile();
return delegate (object target, object?[] parameters)
{
voidExecutor(target, parameters);
return Task.CompletedTask;
};
}
else if (methodCall.Type == typeof(Task))
{
var castMethodCall = Expression.Convert(methodCall, typeof(Task));
var lambda = Expression.Lambda<TaskInvokeDelegate>(castMethodCall, targetParameter, parametersParameter);
return lambda.Compile();
}
else
{
throw new NotSupportedException($"The return type of the [{methodInfo.Name}] method must be Task or void");
}
}
}
Encapsulating the Invocation Chain
After encapsulating executor methods into a unified delegate structure, the next step is to use expression trees to construct an invocation chain of these executors and cancel executors, executed according to the Order sequence.
Example:
MyEvent => InsertDb => InitUser => SendEmail
CancelInsertDb <= CancelInitUser <= CancelSendEmail
Define two delegates for the execution chain:
// Event delegate used to build the execution chain
public delegate Task EventHandlerDelegate();
// Event delegate with dependency injection, used to build the execution chain
internal delegate Task ServiceEventHandlerDelegate(IServiceProvider provider, params object?[] parameters);
ServiceEventHandlerDelegate is used to build the invocation chain and rollback chain, packaging the execution order of event executors and the rollback order together.
。Pseudocode example as follows:
ServiceEventHandlerDelegate next = async (provider, @params) =>
{
try
{
A => B => C
}
catch
{
C => B => A
}
});
To intercept events, it is also necessary to define an event middleware interceptor interface:
// Event execution middleware, i.e., the interceptor when executing events
public interface IEventMiddleware<in TEvent>
where TEvent : IEvent
{
// @event: the event
// next: the next function to execute
Task HandleAsync(TEvent @event, EventHandlerDelegate next);
}
After wrapping with ServiceEventHandlerDelegate, we only need to execute the ServiceEventHandlerDelegate delegate to invoke the entire chain process.
The role of EventHandlerDelegate is to wrap ServiceEventHandlerDelegate. If developers require using middleware to intercept events, we need to wrap it one more layer and pass it to the middleware interceptor.
if (Middleware != null)
{
var mid = ioc.GetRequiredService<IEventMiddleware<TEvent>>();
EventHandlerDelegate next = async () =>
{
await next(ioc, @event, cancellationToken);
};
await mid.HandleAsync(@event, next);
}
else
{
await next(ioc, @event, cancellationToken);
}
Event Scanning and Registration
Next, we start writing code to handle scanning and registering events. Each executor method is scanned and stored in memory, sorted according to the Order attribute, and the Cancel rollback method is bound at the same time.
To bind an executor and its rollback handler, we need to define a model:
// Used to record a Handler
internal class EventInfo
{
// The class where the executor resides
public Type DeclaringType { get; set; }
// Execution order
public int Order { get; set; }
// Event
public Type EventType { get; set; }
// Executor method
public MethodInfo MethodInfo { get; set; }
// Executor method wrapped as a delegate
public TaskInvokeDelegate TaskInvoke { get; set; }
// Whether it is a cancel operation
public bool IsCancel { get; set; }
// Corresponding cancel executor information
public EventInfo? CancelInfo { get; set; }
public override int GetHashCode()
{
return MethodInfo.GetHashCode();
}
public override bool Equals(object? obj)
{
if (obj is not EventInfo info) return false;
return this.GetHashCode() == info.GetHashCode();
}
}
Next is implementing the event bus and defining the event bus interface:
// Event bus service
public interface IEventBus
{
Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default)
where TEvent : IEvent;
}
For convenience, the author splits the EventBus type into two parts using a partial class.
The first part of EventBus contains static methods and fields that cache executor information and store the delegate invocation chain.
public partial class EventBus
{
#region static
// Interceptor
private static Type? Middleware;
// Cache all event executors
private static readonly Dictionary<Type, HashSet<EventInfo>> EventCache = new();
// Invocation chain cache
private static readonly Dictionary<Type, ServiceEventHandlerDelegate> HandlerDelegateCache = new();
// Set interceptor
public static void SetMiddleware(Type type)
{
Middleware = type;
}
// Add an executor for an event
public static void AddEventHandler(
Type declaringType, // The class where the executor method resides
int order,
Type eventType, // Which event is bound
MethodInfo method) // Executor method
{
if (!EventCache.TryGetValue(eventType, out var events))
{
events = new HashSet<EventInfo>();
EventCache[eventType] = events;
}
var info = new EventInfo
{
DeclaringType = declaringType,
EventType = eventType,
MethodInfo = method,
IsCancel = false,
Order = order,
// Wrap the method into a unified format
TaskInvoke = InvokeBuilder.Build(method, declaringType)
};
events.Add(info);
// Bind the corresponding rollback handler
var cancelInfo = events.FirstOrDefault(x => x.EventType == eventType && x.Order == order && x.IsCancel == true);
if (cancelInfo != null) info.CancelInfo = cancelInfo;
}
// Add a cancel event executor
public static void AddCancelEventHandler(Type declaringType, int order, Type eventType, MethodInfo method)
{
if (!EventCache.TryGetValue(eventType, out var events))
{
events = new HashSet<EventInfo>();
EventCache[eventType] = events;
}
var cancelInfo = new EventInfo
{
DeclaringType = declaringType,
EventType = eventType,
MethodInfo = method,
IsCancel = true,
Order = order,
TaskInvoke = InvokeBuilder.Build(method, declaringType)
};
events.Add(cancelInfo);
// Bind this rollback handler to the corresponding executor
var info = events.FirstOrDefault(x => x.EventType == eventType && x.Order == order && x.IsCancel == false);
if (info != null) info.CancelInfo = cancelInfo;
}
#endregion
}
The two methods here are straightforward; they store the executor methods into the cache.
Next, in order to build the function invocation chain and perform rollback when execution fails, a core build method needs to be implemented. This method encapsulates all steps into ServiceEventHandlerDelegate.
// Build the event execution chain
private static ServiceEventHandlerDelegate BuildHandler<TEvent>() where TEvent : IEvent
{
if (HandlerDelegateCache.TryGetValue(typeof(TEvent), out var handler)) return handler;
ServiceEventHandlerDelegate next = async (provider, @params) =>
{
var eventData = @params.OfType<Event>().FirstOrDefault();
var cancel = @params.OfType<CancellationToken>().FirstOrDefault();
var logger = provider.GetRequiredService<ILogger<EventBus>>();
logger.LogDebug("Start executing event: {0},{1}", typeof(TEvent).Name, @params[0]);
if (!EventCache.TryGetValue(typeof(TEvent), out var eventInfos)) return;
var infos = eventInfos.Where(x => x.IsCancel == false).OrderBy(x => x.Order).ToArray();
// Wrap execution chain and rollback chain
for (int i = 0; i < infos.Length; i++)
{
var info = infos[i];
if (cancel.IsCancellationRequested)
{
logger.LogDebug("Event execution has been canceled: {0}, position: {1}", typeof(TEvent).Name, info.MethodInfo.Name);
return;
}
logger.LogDebug("Event: {0},=> {1}", typeof(TEvent).Name, info.MethodInfo.Name);
// Build execution chain
var currentService = provider.GetRequiredService(info.DeclaringType);
try
{
await info.TaskInvoke(currentService, @params);
}
// Execution failed, start rollback
catch (Exception ex)
{
logger.LogError(ex, "Event execution failed: {0}, executor:{1},{2}", typeof(TEvent).Name, info.MethodInfo.Name, @params[0]);
for (int j = i; j >= 0; j--)
{
var backInfo = infos[j];
if (backInfo.CancelInfo is not null)
{
await backInfo.CancelInfo.TaskInvoke(currentService, @params);
}
}
return;
}
}
};
// Store in cache
HandlerDelegateCache[typeof(TEvent)] = next;
return next;
}
Next, write the instance method of EventBus to implement the event publishing method defined in IEventBus.
// Event bus
public partial class EventBus : IEventBus
{
private readonly IServiceProvider _provider;
public EventBus(IServiceProvider serviceProvider)
{
_provider = serviceProvider;
}
// Publish event
public async Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default)
where TEvent : IEvent
{
var handler = BuildHandler<TEvent>();
if (Middleware != null)
{
var mid = _provider.GetRequiredService<IEventMiddleware<TEvent>>();
EventHandlerDelegate next = async () =>
{
await handler(_provider, @event, cancellationToken);
};
await mid.HandleAsync(@event, next);
}
else
{
await handler(_provider, @event, cancellationToken);
}
}
}
Finally, in order to scan event executors when the program starts and register them into the event bus, an extension method for service registration needs to be implemented:
public static class EventBusExtensions
{
// Add event bus extension
public static void AddEventBus(this IServiceCollection services, Type? middleware = null)
{
services.AddScoped<IEventBus, EventBus>();
if (middleware is not null)
{
EventBus.SetMiddleware(middleware);
services.TryAddEnumerable(new ServiceDescriptor(typeof(IEventMiddleware<>), middleware, lifetime: ServiceLifetime.Transient));
}
var assemblies = AppDomain.CurrentDomain.GetAssemblies();
foreach (var assembly in assemblies)
{
foreach (var type in assembly.GetTypes())
{
if (type.CustomAttributes.Any(x => x.AttributeType == typeof(EventAttribute)))
{
GetEventHandler(services, type);
}
}
}
}
// Scan executors in the class
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static void GetEventHandler(IServiceCollection services, Type type)
{
services.AddScoped(type);
var methods = type.GetMethods(BindingFlags.Public | BindingFlags.Instance);
foreach (var method in methods)
{
var attr = method.GetCustomAttribute<EventHandlerAttribute>();
if (attr == null) return;
var parameters = method.GetParameters();
if (parameters.Length == 0) throw new Exception($"{method.Name} definition is incorrect, it must contain at least one parameter");
var eventType = parameters[0].ParameterType;
if (!(eventType.IsSubclassOf(typeof(Event)) || eventType.GetInterface(typeof(IEvent).Name) != null))
throw new Exception($"{method.Name} definition is incorrect, the first parameter must be an event");
if (!attr.IsCancel) EventBus.AddEventHandler(type, attr.Order, eventType, method);
else EventBus.AddCancelEventHandler(type, attr.Order, eventType, method);
}
}
}
Handling Events with Transactions
The example code in this section references the Demo7.Tran project, which uses EFCore to store data in a Sqlite database.
public class MyContext : DbContext
{
public DbSet<AccountEntity> Account { get; set; }
public MyContext(DbContextOptions<MyContext> dbContext) : base(dbContext)
{
Database.EnsureCreated();
}
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
optionsBuilder.UseSqlite(
@"filename=my.db");
}
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder.Entity<AccountEntity>()
.Property(b => b.Id)
.ValueGeneratedOnAdd();
}
}
[PrimaryKey(nameof(Id))]
[Index(nameof(EMail), IsUnique = true)]
public class AccountEntity
{
public int Id { get; set; }
public string Name { get; set; }
public string EMail { get; set; }
public bool VerifyEMail { get; set; }
}
Since one event may have multiple executor methods, although corresponding rollback methods can be executed when an event fails, if the program crashes when preparing to roll back, it may lead to data inconsistency. To solve this problem, database transaction handling can be added in the event middleware.
Define an event middleware and open and roll back transactions inside the middleware.
。
public class TranMiddleware<TEvent> : IEventMiddleware<TEvent> where TEvent : IEvent
{
private readonly MyContext _context;
public TranMiddleware(MyContext context)
{
_context = context;
}
public async Task HandleAsync(TEvent @event, EventHandlerDelegate next)
{
Console.WriteLine("----- Handling command {0} ({1})", @event.GetType().Name, @event.ToString());
using var tran = await _context.Database.BeginTransactionAsync();
try
{
await next();
await tran.CommitAsync();
}
catch (Exception)
{
await tran.RollbackAsync();
throw;
}
}
}
Write the related event to insert data into the database:
[Event]
public class UserRegisterEventHandler
{
private readonly MyContext _context;
public UserRegisterEventHandler(MyContext context)
{
_context = context;
}
[EventHandler(Order = 0)]
public async Task InsertDb(MyEvent @event)
{
var state = new Random().Next(0, 2);
if (state == 1)
{
await _context.Account.AddAsync(new AccountEntity
{
Name = @event.Name,
EMail = @event.EMail,
});
await _context.SaveChangesAsync();
Console.WriteLine("√ User information has been added to the database");
}
else throw new Exception("× Failed to write user information to the database");
}
}
When performing dependency injection, register this middleware:
static async Task Main(string[] args)
{
var ioc = new ServiceCollection();
ioc.AddDbContext<MyContext>();
ioc.AddLogging(build => build.AddConsole());
ioc.AddEventBus(typeof(TranMiddleware<>));
var services = ioc.BuildServiceProvider();
var eventBus = services.GetRequiredService<IEventBus>();
await eventBus.PublishAsync(new MyEvent()
{
Name = "Gongliang",
EMail = "gongliang@maomi.com"
});
}
文章评论