快速开始在本篇教程中将介绍 Maomi.MQ.RabbitMQ 的使用方法以便读者能够快速了解该框架的使用方式和特点。Maomi.MQ.RabbitMQ 是一个基于 RabbitMQ 的消息队列封装框架提供了很多开箱即用的功能通过简单灵活的方式简化消息传输流程提供一系列可靠的消息传输保障机制降低开发者使用难度减少开发时间。主要功能简化的消息定义和消费者无需复杂配置即可上手。丰富灵活的配置发挥 RabbitMQ 的强大力量自动创建队列、死信队列、广播模式、Qos 并发控制、动态路由、动态消费者。自定义序列化器支持 Json、Protobuf、Thrift、MessagePack 二进制消息传输高性能压缩消息减少内存、提高并发量、跨微服务传输。支持自动和自定义发布消息推送交换器支持 RabbitMQ 事务推送模式。简化的消费者模式、事件总线模式、广播模式、动态消费者还支持结合 MeditaR、FastEndpoints 框架使用进一步减少使用负担减少代码侵入性。自定义重试策略从容应对服务错误、强一致性消息、高并发流量。支持本地消息表模式强一致性保证业务消息不丢失解决空悬挂、重复处理等异步消息异常问题保证业务可靠性。自由的消费者模式除了 MeditaR、FastEndpoints还可以自由接入其它框架充分利用第三方框架的优质能力。快速配置创建一个 Web 项目可参考 WebDemo 项目引入 Maomi.MQ.RabbitMQ 包在 Web 配置中注入服务// using Maomi.MQ; // using RabbitMQ.Client; builder.Services.AddMaomiMQ((MqOptionsBuilder options) { options.WorkId 1; options.AppName myapp; options.Rabbit (ConnectionFactory options) { options.HostName Environment.GetEnvironmentVariable(RABBITMQ)!; options.Port 5672; }; }, [typeof(Program).Assembly]); var app builder.Build();WorkId 指定用于生成分布式雪花 id 的节点 id默认为 0。每条消息生成一个唯一的 id便于追踪。如果不设置雪花id在分布式服务中多实例并行工作时可能会产生相同的 id。AppName用于标识在日志和链路追踪中标识消息的生产者或消费者。RabbitRabbitMQ 客户端配置请参考 ConnectionFactory。定义消息模型类模型类是 MQ 通讯的消息基础该模型类将会被序列化为二进制内容传递到 RabbitMQ 服务器中。public class TestEvent { public int Id { get; set; } public override string ToString() { return Id.ToString(); } }定义消费者消费者需要实现IConsumerTEvent接口以及使用[Consumer]特性注解配置消费者属性如下所示[Consumer(test)]表示该消费者订阅的队列名称是test。IConsumerTEvent接口有三个方法ExecuteAsync方法用于处理消息FaildAsync会在ExecuteAsync异常时立即执行如果代码一直异常最终会调用FallbackAsync方法Maomi.MQ 框架会根据 ConsumerState 值确定是否将消息放回队列重新消费或者做其它处理动作。[Consumer(test)] public class MyConsumer : IConsumerTestEvent { // 消费 public async Task ExecuteAsync(MessageHeader messageHeader, TestEvent message) { Console.WriteLine($事件 id: {message.Id} {DateTime.Now}); await Task.CompletedTask; } // 每次消费失败时执行 public Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, TestEvent message) Task.CompletedTask; // 补偿 public TaskConsumerState FallbackAsync(MessageHeader messageHeader, TestEvent? message, Exception? ex) Task.FromResult( ConsumerState.Ack); }Maomi.MQ 还具有多种消费者模式代码写法不一样后续会详细讲解不同的消费者模式。如果要发布消息只需要注入 IMessagePublisher 服务即可。private readonly IMessagePublisher _messagePublisher; public IndexController(IMessagePublisher messagePublisher) { _messagePublisher messagePublisher; } [HttpGet(publish)] public async Taskstring Publisher() { // 发布消息 await _messagePublisher.PublishAsync(exchange: string.Empty, routingKey: test, message: new TestEvent { Id 123 }); return ok; }启动 Web 服务在 swagger 页面上请求 API 接口MyConsumer 服务会立即接收到发布的消息。就是这么简单就是这么方便。怎么发布消息自动发布虽然发布者和消费者共用一个模型类但是在一个项目中怎么配置模型类都不会影响消费者。将分布者消费者隔离简化框架设计和微服务解耦支持不同的编程语言服务相互通讯共同完成业务逻辑。例如为了简化消息发布我们可以在模型类指定绑定的路由键。[RouterKey(scenario.quickstart)] public sealed class QuickStartMessage { }发布时是根据模型类的[RouterKey]自动找到要推送的交换器和路由键简化发布消息的参数。await _publisher.AutoPublishAsync(message);当然手动设置也可以await _publisher.PublishAsync(string.Empty, request.Queue, message);消费者可以自由设置要消费的队列即使是相同的模型类可以自由设定队列名称不会产生干扰。[Consumer(scenario.quickstart)] public sealed class QuickStartConsumer : IConsumerQuickStartMessage { }手动发布适用于你需要精细控制exchange/routingKey或消息属性TTL、Header、优先级等的场景。[HttpPost(publish-manual)] public async TaskIResult PublishManual() { var message new OrderCreatedMessage { OrderNo SO-20260211-002, Amount 299.00m }; await _publisher.PublishAsync( exchange: biz.order.exchange, routingKey: order.created.v1, message: message, properties: p { p.Expiration 60000; p.Headers ?? new Dictionarystring, object?(); p.Headers[tenant] tenant-a; }); return Results.Ok(message); }但是 Maomi.MQ.RabbitMQ 提供了更为简单易用的方式实现自动处理队列属性。例如要设计一个死信队列你只需要在消费者上设置属性即可[Consumer( example.retry.main, RetryFaildRequeue false, DeadExchange , DeadRoutingKey example.retry.dead)] public sealed class RetryConsumer : IConsumerRetryMessage { }RetryFaildRequeue表示消费失败后不会放回原队列。DeadExchange死信交换器名称。DeadRoutingKey死信队列路由键。所以发布消息时只需要使用这两行代码await _publisher.PublishAsync(string.Empty, request.Queue, message); await _publisher.AutoPublishAsync(message);普通消费者、动态消费者、事件总线Maomi.MQ 支持以多种姿势创建消费者一个项目里面可以同时使用多种消费者模式自由灵活而不会冲突。这三种模式不是互斥关系而是处理问题的方式不同普通消费者最基础的消费模式继承IConsumerTMessage即可使用还可以自由扩展出不同类型的消费者模式例如 MediatR 。动态消费者也是继承IConsumerTMessage 运行时动态创建/停止订阅。事件总线一个消息触发多个有顺序的处理步骤生成执行链和回滚链路。普通消费者模式实现IConsumerTMessage接口即可可以自定义消费、重试、回滚逻辑这种模式使用简单还能从容处理消息错误和回滚。正常处理消息会调用ExecuteAsync失败后调用FaildAsync重试耗尽进入FallbackAsync。[Consumer(biz.order.created.v1, Qos 10, RetryFaildRequeue false)] public sealed class NormalOrderConsumer : IConsumerOrderCreatedMessage { public Task ExecuteAsync(MessageHeader messageHeader, OrderCreatedMessage message) { Console.WriteLine($normal consume {message.OrderNo}); return Task.CompletedTask; } public Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, OrderCreatedMessage message) Task.CompletedTask; public TaskConsumerState FallbackAsync(MessageHeader messageHeader, OrderCreatedMessage? message, Exception? ex) Task.FromResult(ConsumerState.Ack); }动态消费者模式你可以在运行时将业务需要订阅的消息队列动态注册例如 SAAS 平台新建租户后需要动态按照租户前缀消费对应的主题也可以自行取消订阅。public sealed class DynamicDemoService { private readonly IDynamicConsumer _dynamicConsumer; public DynamicDemoService(IDynamicConsumer dynamicConsumer) { _dynamicConsumer dynamicConsumer; } public async Taskstring StartAsync(string queue) { var options new ConsumerAttribute(queue) { Qos 5 }; var consumerTag await _dynamicConsumer.ConsumerAsyncOrderCreatedMessage( options, execute: (header, message) { Console.WriteLine($dynamic consume {message.OrderNo}); return Task.CompletedTask; }, faild: (header, ex, retryCount, message) Task.CompletedTask, fallback: (header, message, ex) Task.FromResult(ConsumerState.Ack)); return consumerTag; } public Task StopByQueueAsync(string queue) _dynamicConsumer.StopConsumerAsync(queue); }事件总线模式事件总线模式可以自由编排事件执行链路框架会按照链路自动执行并在执行失败后自动执行回滚链路。IEventMiddlewareT负责构建执行链[EventOrder]控制步骤顺序适合一个事件拆成多个业务步骤能够很好将业务解耦。using Maomi.MQ.EventBus; [RouterKey(biz.order.pipeline.v1)] public sealed class OrderPipelineEvent { public Guid OrderId { get; set; } Guid.NewGuid(); public decimal Amount { get; set; } } [Consumer(biz.order.pipeline.v1)] public sealed class OrderPipelineMiddleware : IEventMiddlewareOrderPipelineEvent { public Task ExecuteAsync(MessageHeader messageHeader, OrderPipelineEvent message, EventHandlerDelegateOrderPipelineEvent next) { return next(messageHeader, message, CancellationToken.None); } public Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, OrderPipelineEvent? message) Task.CompletedTask; public TaskConsumerState FallbackAsync(MessageHeader messageHeader, OrderPipelineEvent? message, Exception? ex) Task.FromResult(ConsumerState.Ack); } [EventOrder(1)] public sealed class ReserveInventoryHandler : IEventHandlerOrderPipelineEvent { public Task ExecuteAsync(OrderPipelineEvent message, CancellationToken cancellationToken) Task.CompletedTask; public Task CancelAsync(OrderPipelineEvent message, CancellationToken cancellationToken) Task.CompletedTask; } [EventOrder(2)] public sealed class CreateBillHandler : IEventHandlerOrderPipelineEvent { public Task ExecuteAsync(OrderPipelineEvent message, CancellationToken cancellationToken) Task.CompletedTask; public Task CancelAsync(OrderPipelineEvent message, CancellationToken cancellationToken) Task.CompletedTask; }广播模式例如为了减少延时和提供性能在 Redis 做完缓存后还需要做本地缓存。但是如果数据发生变更怎么刷新本地缓存呢那就使用广播模式同一个服务的不同实例使用广播模式时每个实例都可以收到消息而不是随机分配给其中一个。代码非常简单设置IsBroadcast true即可这样即使是同一个服务的不同实例也会收到广播通知该实例下线会自动取消订阅不会消耗服务器资源。[Consumer(scenario.quickstart, IsBroadcast true)] public sealed class QuickStartConsumer : IConsumerQuickStartMessage { }高性能序列化器Maomi.MQ 默认使用 JSON 做序列化数据传输你也可以引入其它序列化器提高压缩消息的性能。目前支持 System.Text.Json、Protobuf、Thrift、MessagePack 四种二进制序列化协议你可以自由选择组合使用不同的序列化器到项目中以便在不同微服务中传递消息并且实现高性能传递消息。例如使用 protobuf-net 框架 识别标记了[ProtoContract]的模型类那么此类型使用 Protobuf 协议压缩消息其它消息还是走 JSON。using ProtoBuf; builder.Services.AddMaomiMQ(options { options.MessageSerializers serializers { // 添加 Protobuf 序列化器 serializers.Insert(0, new ProtobufMessageSerializer()); }; }, [typeof(Program).Assembly]); [ProtoContract] public sealed class PersonMessage { [ProtoMember(1)] public Guid Id { get; set; } Guid.NewGuid(); [ProtoMember(2)] public string Name { get; set; } string.Empty; [ProtoMember(3)] public int Age { get; set; } }强一致性事务模式借鉴 CAP 等框架的本地消息表模式通过 MQ 和本地消息表实现简单的强一致性的分布式事务在业务不太复杂的企业项目中可以简化编写事务的难度不同的微服务以轻量、简洁、不复杂的模式接入降低了编码和维护难度。配置以 MySQL 为例using Maomi.MQ.Transaction.Mysql; using MySqlConnector; builder.Services.AddMaomiMQTransactionMySql(); builder.Services.AddMaomiMQTransaction(options { options.ProviderName TransactionProviderNames.MySql; options.Connection _ new MySqlConnection(builder.Configuration.GetConnectionString(Default)); options.AutoCreateTable true; });业务代码发布消息public sealed class OrderAppService { private readonly IMessagePublisher _publisher; private readonly string _connectionString; public OrderAppService(IMessagePublisher publisher, IConfiguration configuration) { _publisher publisher; _connectionString configuration.GetConnectionString(Default)!; } public async Task CreateOrderAsync(CancellationToken cancellationToken) { await using var connection new MySqlConnection(_connectionString); await connection.OpenAsync(cancellationToken); await using var transaction await connection.BeginTransactionAsync(cancellationToken); // 1) 执行业务 SQL // await SaveOrderAsync(connection, transaction, ...); await transaction.CommitAsync(cancellationToken); // 2) 发送消息放在事务外面 await _publisher.AutoPublishAsync(new OrderCreatedMessage { OrderNo SO-TX-001, Amount 520m }, cancellationToken: cancellationToken); } }注意本地事务模式不是全局分布式事务协调器它解决的是“单库与消息发送”的一致性。如果事务提交了但是最后发送消息失败那么后台会有一个 BackgroundService 定期扫描数据库将这些没有发送成功的消息推送到 RabbitMQ。结合 MediatR如果你的业务已经使用 MediatR可以直接把 MQ 当作 MediatR 命令输入通道也就是原有的代码完全不需要改动只需要在 Command 上加上[MediatRConsumer]即可。using Maomi.MQ.MediatR; using MediatR; [MediatRConsumer(biz.mediatr.order, Qos 1)] public sealed class SyncOrderCommand : IRequest { public string OrderNo { get; set; } string.Empty; } public sealed class SyncOrderCommandHandler : IRequestHandlerSyncOrderCommand { public Task Handle(SyncOrderCommand request, CancellationToken cancellationToken) Task.CompletedTask; } // 通过 MediatR 触发 MQ 发布 await mediator.Send(new MediatRMqCommandSyncOrderCommand { Message new SyncOrderCommand { OrderNo SO-MED-001 } });原理MediatRTypeFilter会把带MediatRConsumer的命令类型映射成 MQ 消费者消费后再转发给IMediator.Send(...)。或者你可以继续使用 IMessagePublisher 发布消息。await _publisher.PublishAsync(string.Empty, request.Queue, message);如果你的项目已经引入了 MediatR那么不需要为了使用 RabbitMQ 再搞出别的消费模式代码使用 Maomi.MQ 可以直接简化接入 RabbitMQ 的麻烦继续以 MediatR 的模式实现异步消费。结合 FastEndpoints如果你使用 FastEndpoints也可以通过类型过滤器把IEvent/ICommand接入 MQ。using Maomi.MQ.Filters; app.Services.RegisterGenericCommand(typeof(FastEndpointsMqCommand), typeof(FastEndpointsMqCommandHandler)); app.UseFastEndpoints(); [FastEndpointsConsumer(biz.fast.order.paid, Qos 1)] public sealed class OrderPaidEvent : FastEndpoints.IEvent { public string OrderNo { get; set; } string.Empty; }await _messagePublisher.AutoPublishAsync(new OrderPaidEvent { OrderNo SO-FE-001 });其它能力下面这些能力这里先简述后续你可以按需深入
Maomi.MQ 功能强大的 .NET RabbitMQ 消息队列通讯模型框架来了
快速开始在本篇教程中将介绍 Maomi.MQ.RabbitMQ 的使用方法以便读者能够快速了解该框架的使用方式和特点。Maomi.MQ.RabbitMQ 是一个基于 RabbitMQ 的消息队列封装框架提供了很多开箱即用的功能通过简单灵活的方式简化消息传输流程提供一系列可靠的消息传输保障机制降低开发者使用难度减少开发时间。主要功能简化的消息定义和消费者无需复杂配置即可上手。丰富灵活的配置发挥 RabbitMQ 的强大力量自动创建队列、死信队列、广播模式、Qos 并发控制、动态路由、动态消费者。自定义序列化器支持 Json、Protobuf、Thrift、MessagePack 二进制消息传输高性能压缩消息减少内存、提高并发量、跨微服务传输。支持自动和自定义发布消息推送交换器支持 RabbitMQ 事务推送模式。简化的消费者模式、事件总线模式、广播模式、动态消费者还支持结合 MeditaR、FastEndpoints 框架使用进一步减少使用负担减少代码侵入性。自定义重试策略从容应对服务错误、强一致性消息、高并发流量。支持本地消息表模式强一致性保证业务消息不丢失解决空悬挂、重复处理等异步消息异常问题保证业务可靠性。自由的消费者模式除了 MeditaR、FastEndpoints还可以自由接入其它框架充分利用第三方框架的优质能力。快速配置创建一个 Web 项目可参考 WebDemo 项目引入 Maomi.MQ.RabbitMQ 包在 Web 配置中注入服务// using Maomi.MQ; // using RabbitMQ.Client; builder.Services.AddMaomiMQ((MqOptionsBuilder options) { options.WorkId 1; options.AppName myapp; options.Rabbit (ConnectionFactory options) { options.HostName Environment.GetEnvironmentVariable(RABBITMQ)!; options.Port 5672; }; }, [typeof(Program).Assembly]); var app builder.Build();WorkId 指定用于生成分布式雪花 id 的节点 id默认为 0。每条消息生成一个唯一的 id便于追踪。如果不设置雪花id在分布式服务中多实例并行工作时可能会产生相同的 id。AppName用于标识在日志和链路追踪中标识消息的生产者或消费者。RabbitRabbitMQ 客户端配置请参考 ConnectionFactory。定义消息模型类模型类是 MQ 通讯的消息基础该模型类将会被序列化为二进制内容传递到 RabbitMQ 服务器中。public class TestEvent { public int Id { get; set; } public override string ToString() { return Id.ToString(); } }定义消费者消费者需要实现IConsumerTEvent接口以及使用[Consumer]特性注解配置消费者属性如下所示[Consumer(test)]表示该消费者订阅的队列名称是test。IConsumerTEvent接口有三个方法ExecuteAsync方法用于处理消息FaildAsync会在ExecuteAsync异常时立即执行如果代码一直异常最终会调用FallbackAsync方法Maomi.MQ 框架会根据 ConsumerState 值确定是否将消息放回队列重新消费或者做其它处理动作。[Consumer(test)] public class MyConsumer : IConsumerTestEvent { // 消费 public async Task ExecuteAsync(MessageHeader messageHeader, TestEvent message) { Console.WriteLine($事件 id: {message.Id} {DateTime.Now}); await Task.CompletedTask; } // 每次消费失败时执行 public Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, TestEvent message) Task.CompletedTask; // 补偿 public TaskConsumerState FallbackAsync(MessageHeader messageHeader, TestEvent? message, Exception? ex) Task.FromResult( ConsumerState.Ack); }Maomi.MQ 还具有多种消费者模式代码写法不一样后续会详细讲解不同的消费者模式。如果要发布消息只需要注入 IMessagePublisher 服务即可。private readonly IMessagePublisher _messagePublisher; public IndexController(IMessagePublisher messagePublisher) { _messagePublisher messagePublisher; } [HttpGet(publish)] public async Taskstring Publisher() { // 发布消息 await _messagePublisher.PublishAsync(exchange: string.Empty, routingKey: test, message: new TestEvent { Id 123 }); return ok; }启动 Web 服务在 swagger 页面上请求 API 接口MyConsumer 服务会立即接收到发布的消息。就是这么简单就是这么方便。怎么发布消息自动发布虽然发布者和消费者共用一个模型类但是在一个项目中怎么配置模型类都不会影响消费者。将分布者消费者隔离简化框架设计和微服务解耦支持不同的编程语言服务相互通讯共同完成业务逻辑。例如为了简化消息发布我们可以在模型类指定绑定的路由键。[RouterKey(scenario.quickstart)] public sealed class QuickStartMessage { }发布时是根据模型类的[RouterKey]自动找到要推送的交换器和路由键简化发布消息的参数。await _publisher.AutoPublishAsync(message);当然手动设置也可以await _publisher.PublishAsync(string.Empty, request.Queue, message);消费者可以自由设置要消费的队列即使是相同的模型类可以自由设定队列名称不会产生干扰。[Consumer(scenario.quickstart)] public sealed class QuickStartConsumer : IConsumerQuickStartMessage { }手动发布适用于你需要精细控制exchange/routingKey或消息属性TTL、Header、优先级等的场景。[HttpPost(publish-manual)] public async TaskIResult PublishManual() { var message new OrderCreatedMessage { OrderNo SO-20260211-002, Amount 299.00m }; await _publisher.PublishAsync( exchange: biz.order.exchange, routingKey: order.created.v1, message: message, properties: p { p.Expiration 60000; p.Headers ?? new Dictionarystring, object?(); p.Headers[tenant] tenant-a; }); return Results.Ok(message); }但是 Maomi.MQ.RabbitMQ 提供了更为简单易用的方式实现自动处理队列属性。例如要设计一个死信队列你只需要在消费者上设置属性即可[Consumer( example.retry.main, RetryFaildRequeue false, DeadExchange , DeadRoutingKey example.retry.dead)] public sealed class RetryConsumer : IConsumerRetryMessage { }RetryFaildRequeue表示消费失败后不会放回原队列。DeadExchange死信交换器名称。DeadRoutingKey死信队列路由键。所以发布消息时只需要使用这两行代码await _publisher.PublishAsync(string.Empty, request.Queue, message); await _publisher.AutoPublishAsync(message);普通消费者、动态消费者、事件总线Maomi.MQ 支持以多种姿势创建消费者一个项目里面可以同时使用多种消费者模式自由灵活而不会冲突。这三种模式不是互斥关系而是处理问题的方式不同普通消费者最基础的消费模式继承IConsumerTMessage即可使用还可以自由扩展出不同类型的消费者模式例如 MediatR 。动态消费者也是继承IConsumerTMessage 运行时动态创建/停止订阅。事件总线一个消息触发多个有顺序的处理步骤生成执行链和回滚链路。普通消费者模式实现IConsumerTMessage接口即可可以自定义消费、重试、回滚逻辑这种模式使用简单还能从容处理消息错误和回滚。正常处理消息会调用ExecuteAsync失败后调用FaildAsync重试耗尽进入FallbackAsync。[Consumer(biz.order.created.v1, Qos 10, RetryFaildRequeue false)] public sealed class NormalOrderConsumer : IConsumerOrderCreatedMessage { public Task ExecuteAsync(MessageHeader messageHeader, OrderCreatedMessage message) { Console.WriteLine($normal consume {message.OrderNo}); return Task.CompletedTask; } public Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, OrderCreatedMessage message) Task.CompletedTask; public TaskConsumerState FallbackAsync(MessageHeader messageHeader, OrderCreatedMessage? message, Exception? ex) Task.FromResult(ConsumerState.Ack); }动态消费者模式你可以在运行时将业务需要订阅的消息队列动态注册例如 SAAS 平台新建租户后需要动态按照租户前缀消费对应的主题也可以自行取消订阅。public sealed class DynamicDemoService { private readonly IDynamicConsumer _dynamicConsumer; public DynamicDemoService(IDynamicConsumer dynamicConsumer) { _dynamicConsumer dynamicConsumer; } public async Taskstring StartAsync(string queue) { var options new ConsumerAttribute(queue) { Qos 5 }; var consumerTag await _dynamicConsumer.ConsumerAsyncOrderCreatedMessage( options, execute: (header, message) { Console.WriteLine($dynamic consume {message.OrderNo}); return Task.CompletedTask; }, faild: (header, ex, retryCount, message) Task.CompletedTask, fallback: (header, message, ex) Task.FromResult(ConsumerState.Ack)); return consumerTag; } public Task StopByQueueAsync(string queue) _dynamicConsumer.StopConsumerAsync(queue); }事件总线模式事件总线模式可以自由编排事件执行链路框架会按照链路自动执行并在执行失败后自动执行回滚链路。IEventMiddlewareT负责构建执行链[EventOrder]控制步骤顺序适合一个事件拆成多个业务步骤能够很好将业务解耦。using Maomi.MQ.EventBus; [RouterKey(biz.order.pipeline.v1)] public sealed class OrderPipelineEvent { public Guid OrderId { get; set; } Guid.NewGuid(); public decimal Amount { get; set; } } [Consumer(biz.order.pipeline.v1)] public sealed class OrderPipelineMiddleware : IEventMiddlewareOrderPipelineEvent { public Task ExecuteAsync(MessageHeader messageHeader, OrderPipelineEvent message, EventHandlerDelegateOrderPipelineEvent next) { return next(messageHeader, message, CancellationToken.None); } public Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, OrderPipelineEvent? message) Task.CompletedTask; public TaskConsumerState FallbackAsync(MessageHeader messageHeader, OrderPipelineEvent? message, Exception? ex) Task.FromResult(ConsumerState.Ack); } [EventOrder(1)] public sealed class ReserveInventoryHandler : IEventHandlerOrderPipelineEvent { public Task ExecuteAsync(OrderPipelineEvent message, CancellationToken cancellationToken) Task.CompletedTask; public Task CancelAsync(OrderPipelineEvent message, CancellationToken cancellationToken) Task.CompletedTask; } [EventOrder(2)] public sealed class CreateBillHandler : IEventHandlerOrderPipelineEvent { public Task ExecuteAsync(OrderPipelineEvent message, CancellationToken cancellationToken) Task.CompletedTask; public Task CancelAsync(OrderPipelineEvent message, CancellationToken cancellationToken) Task.CompletedTask; }广播模式例如为了减少延时和提供性能在 Redis 做完缓存后还需要做本地缓存。但是如果数据发生变更怎么刷新本地缓存呢那就使用广播模式同一个服务的不同实例使用广播模式时每个实例都可以收到消息而不是随机分配给其中一个。代码非常简单设置IsBroadcast true即可这样即使是同一个服务的不同实例也会收到广播通知该实例下线会自动取消订阅不会消耗服务器资源。[Consumer(scenario.quickstart, IsBroadcast true)] public sealed class QuickStartConsumer : IConsumerQuickStartMessage { }高性能序列化器Maomi.MQ 默认使用 JSON 做序列化数据传输你也可以引入其它序列化器提高压缩消息的性能。目前支持 System.Text.Json、Protobuf、Thrift、MessagePack 四种二进制序列化协议你可以自由选择组合使用不同的序列化器到项目中以便在不同微服务中传递消息并且实现高性能传递消息。例如使用 protobuf-net 框架 识别标记了[ProtoContract]的模型类那么此类型使用 Protobuf 协议压缩消息其它消息还是走 JSON。using ProtoBuf; builder.Services.AddMaomiMQ(options { options.MessageSerializers serializers { // 添加 Protobuf 序列化器 serializers.Insert(0, new ProtobufMessageSerializer()); }; }, [typeof(Program).Assembly]); [ProtoContract] public sealed class PersonMessage { [ProtoMember(1)] public Guid Id { get; set; } Guid.NewGuid(); [ProtoMember(2)] public string Name { get; set; } string.Empty; [ProtoMember(3)] public int Age { get; set; } }强一致性事务模式借鉴 CAP 等框架的本地消息表模式通过 MQ 和本地消息表实现简单的强一致性的分布式事务在业务不太复杂的企业项目中可以简化编写事务的难度不同的微服务以轻量、简洁、不复杂的模式接入降低了编码和维护难度。配置以 MySQL 为例using Maomi.MQ.Transaction.Mysql; using MySqlConnector; builder.Services.AddMaomiMQTransactionMySql(); builder.Services.AddMaomiMQTransaction(options { options.ProviderName TransactionProviderNames.MySql; options.Connection _ new MySqlConnection(builder.Configuration.GetConnectionString(Default)); options.AutoCreateTable true; });业务代码发布消息public sealed class OrderAppService { private readonly IMessagePublisher _publisher; private readonly string _connectionString; public OrderAppService(IMessagePublisher publisher, IConfiguration configuration) { _publisher publisher; _connectionString configuration.GetConnectionString(Default)!; } public async Task CreateOrderAsync(CancellationToken cancellationToken) { await using var connection new MySqlConnection(_connectionString); await connection.OpenAsync(cancellationToken); await using var transaction await connection.BeginTransactionAsync(cancellationToken); // 1) 执行业务 SQL // await SaveOrderAsync(connection, transaction, ...); await transaction.CommitAsync(cancellationToken); // 2) 发送消息放在事务外面 await _publisher.AutoPublishAsync(new OrderCreatedMessage { OrderNo SO-TX-001, Amount 520m }, cancellationToken: cancellationToken); } }注意本地事务模式不是全局分布式事务协调器它解决的是“单库与消息发送”的一致性。如果事务提交了但是最后发送消息失败那么后台会有一个 BackgroundService 定期扫描数据库将这些没有发送成功的消息推送到 RabbitMQ。结合 MediatR如果你的业务已经使用 MediatR可以直接把 MQ 当作 MediatR 命令输入通道也就是原有的代码完全不需要改动只需要在 Command 上加上[MediatRConsumer]即可。using Maomi.MQ.MediatR; using MediatR; [MediatRConsumer(biz.mediatr.order, Qos 1)] public sealed class SyncOrderCommand : IRequest { public string OrderNo { get; set; } string.Empty; } public sealed class SyncOrderCommandHandler : IRequestHandlerSyncOrderCommand { public Task Handle(SyncOrderCommand request, CancellationToken cancellationToken) Task.CompletedTask; } // 通过 MediatR 触发 MQ 发布 await mediator.Send(new MediatRMqCommandSyncOrderCommand { Message new SyncOrderCommand { OrderNo SO-MED-001 } });原理MediatRTypeFilter会把带MediatRConsumer的命令类型映射成 MQ 消费者消费后再转发给IMediator.Send(...)。或者你可以继续使用 IMessagePublisher 发布消息。await _publisher.PublishAsync(string.Empty, request.Queue, message);如果你的项目已经引入了 MediatR那么不需要为了使用 RabbitMQ 再搞出别的消费模式代码使用 Maomi.MQ 可以直接简化接入 RabbitMQ 的麻烦继续以 MediatR 的模式实现异步消费。结合 FastEndpoints如果你使用 FastEndpoints也可以通过类型过滤器把IEvent/ICommand接入 MQ。using Maomi.MQ.Filters; app.Services.RegisterGenericCommand(typeof(FastEndpointsMqCommand), typeof(FastEndpointsMqCommandHandler)); app.UseFastEndpoints(); [FastEndpointsConsumer(biz.fast.order.paid, Qos 1)] public sealed class OrderPaidEvent : FastEndpoints.IEvent { public string OrderNo { get; set; } string.Empty; }await _messagePublisher.AutoPublishAsync(new OrderPaidEvent { OrderNo SO-FE-001 });其它能力下面这些能力这里先简述后续你可以按需深入