说明
作者:痴者工良
文档地址:https://mmq.whuanle.cn
仓库地址:https://github.com/whuanle/Maomi.MQ
作者博客:
o https://www.whuanle.cn o https://www.cnblogs.com/whuanle
导读
Maomi.MQ 是一个简化了消息队列使用方式的通讯框架,目前支持了 RabbitMQ。
Maomi.MQ.RabbitMQ 是一个用于专为 RabbitMQ 设计的发布者和消费者通讯模型,大大简化了发布和消息的代码,并提供一系列简便和实用的功能,开发者可以通过框架提供的消费模型实现高性能消费、事件编排,框架还支持发布者确认机制、自定义重试机制、补偿机制、死信队列、延迟队列、连接通道复用等一系列的便利功能。开发者可以把更多的精力放到业务逻辑中,通过 Maomi.MQ.RabbitMQ 框架简化跨进程消息通讯模式,使得跨进程消息传递更加简单和可靠。
此外,框架通过 runtime 内置的 api 支持了分布式可观测性,可以通过进一步使用 OpenTelemetry 等框架进一步收集可观测性信息,推送到基础设施平台中。
快速开始
在本篇教程中,将介绍 Maomi.MQ.RabbitMQ 的使用方法,以便读者能够快速了解该框架的使用方式和特点。
创建一个 Web 项目(可参考 WebDemo 项目),引入 Maomi.MQ.RabbitMQ 包,在 Web 配置中注入服务:
// using Maomi.MQ;
// using RabbitMQ.Client;
builder.Services.AddMaomiMQ((MqOptionsBuilder options)=>
{
options.WorkId=1;
options.AppName="myapp";
options.Rabbit=(ConnectionFactory options)=>
{
options.HostName=Environment.GetEnvironmentVariable("RABBITMQ")!;
options.Port=5672;
options.ClientProvidedName=Assembly.GetExecutingAssembly().GetName().Name;
};
},[typeof(Program).Assembly]);
var app = builder.Build();
o WorkId: 指定用于生成分布式雪花 id 的节点 id,默认为 0。 每条消息生成一个唯一的 id,便于追踪。如果不设置雪花id,在分布式服务中,多实例并行工作时,可能会产生相同的 id。 o AppName:用于标识消息的生产者,以及在日志和链路追踪中标识消息的生产者或消费者。 o Rabbit:RabbitMQ 客户端配置,请参考 ConnectionFactory。
定义消息模型类,模型类是 MQ 通讯的消息基础,该模型类将会被序列化为二进制内容传递到 RabbitMQ 服务器中。
public classTestEvent
{
publicintId{get;set;}
public override string ToString()
{
returnId.ToString();
}
}
定义消费者,消费者需要实现 IConsumer
接口,以及使用 [Consumer]
特性注解配置消费者属性,如下所示,[Consumer("test")]
表示该消费者订阅的队列名称是 test
。
IConsumer
接口有三个方法,ExecuteAsync
方法用于处理消息,FaildAsync
会在 ExecuteAsync
异常时立即执行,如果代码一直异常,最终会调用 FallbackAsync
方法,Maomi.MQ 框架会根据 ConsumerState 值确定是否将消息放回队列重新消费,或者做其它处理动作。
[Consumer("test")]
publicclassMyConsumer:IConsumer<TestEvent>
{
// 消费
publicasyncTaskExecuteAsync(MessageHeader messageHeader,TestEvent message)
{
Console.WriteLine($"事件 id: {message.Id} {DateTime.Now}");
awaitTask.CompletedTask;
}
// 每次消费失败时执行
publicTaskFaildAsync(MessageHeader messageHeader,Exception ex,int retryCount,TestEvent message)
=>Task.CompletedTask;
// 补偿
publicTask<ConsumerState>FallbackAsync(MessageHeader messageHeader,TestEvent? message,Exception? ex)
=>Task.FromResult(ConsumerState.Ack);
}
Maomi.MQ 还具有多种消费者模式,代码写法不一样,后续会详细讲解不同的消费者模式。
如果要发布消息,只需要注入 IMessagePublisher 服务即可。
[ApiController]
[Route("[controller]")]
publicclassIndexController:ControllerBase
{
privatereadonlyIMessagePublisher _messagePublisher;
publicIndexController(IMessagePublisher messagePublisher)
{
_messagePublisher = messagePublisher;
}
[HttpGet("publish")]
publicasyncTask<string>Publisher()
{
// 发布消息
await _messagePublisher.PublishAsync(exchange:string.Empty, routingKey:"test", message:newTestEvent
{
Id=123
});
return"ok";
}
}
启动 Web 服务,在 swagger 页面上请求 API 接口,MyConsumer 服务会立即接收到发布的消息。
如果是控制台项目,则需要引入 Microsoft.Extensions.Hosting 包,以便让消费者在后台订阅队列消费消息。
参考 ConsoleDemo 项目。
using Maomi.MQ;
usingMicrosoft.Extensions.Hosting;
usingMicrosoft.Extensions.Logging;
usingRabbitMQ.Client;
usingSystem.Reflection;
var host =newHostBuilder()
.ConfigureLogging(options =>
{
options.AddConsole();
options.AddDebug();
})
.ConfigureServices(services =>
{
services.AddMaomiMQ(options =>
{
options.WorkId=1;
options.AppName="myapp";
options.Rabbit=(ConnectionFactory options)=>
{
options.HostName=Environment.GetEnvironmentVariable("RABBITMQ")!;
options.Port=5672;
options.ClientProvidedName=Assembly.GetExecutingAssembly().GetName().Name;
};
},newSystem.Reflection.Assembly[]{typeof(Program).Assembly});
}).Build();
// 后台运行
var task = host.RunAsync();
Console.ReadLine();
消息发布者
消息发布者用于推送消息到 RabbitMQ 服务器中,Maomi.MQ 支持多种消息发布者模式,支持 RabbitMQ 事务模式等,示例项目请参考 PublisherWeb。
Maomi.MQ 通过 IMessagePublisher 向开发者提供消息推送服务。
在发布消息之前,需要定义一个事件模型类,用于传递消息。
public classTestEvent
{
publicintId{get;set;}
public override string ToString()
{
returnId.ToString();
}
}
然后注入 IMessagePublisher 服务,发布消息:
[ApiController]
[Route("[controller]")]
publicclassIndexController:ControllerBase
{
privatereadonlyIMessagePublisher _messagePublisher;
publicIndexController(IMessagePublisher messagePublisher)
{
_messagePublisher = messagePublisher;
}
[HttpGet("publish")]
publicasyncTask<string>Publisher()
{
for(var i =0; i <100; i++)
{
await _messagePublisher.PublishAsync(exchange:string.Empty, routingKey:"publish", message:newTestEvent
{
Id= i
});
}
return"ok";
}
}
一般情况下,一个模型类只应该被一个消费者所使用,那么通过事件可以找到唯一的消费者,也就是通过事件类型找到消费者的 IConsumerOptions,此时框架可以使用对应的配置发送消息。
TestMessageEvent 模型只有一个消费者:
[Consumer("publish", Qos = 1, RetryFaildRequeue = true)]
public class TestEventConsumer : IConsumer<TestMessageEvent>
{
// ... ...
}
可以直接发送事件,不需要填写交换器(Exchange)和路由键(RoutingKey)。
[HttpGet("publish_message")]
publicasyncTask<string>PublisherMessage()
{
// 如果在本项目中 TestMessageEvent 只指定了一个消费者,那么通过 TestMessageEvent 自动寻找对应的配置
for(var i =0; i <100; i++)
{
await _messagePublisher.PublishAsync(model:newTestMessageEvent
{
Id= i
});
}
return"ok";
}
IMessagePublisher
IMessagePublisher 是 Maomi.MQ 的基础消息发布接口,有以下方法:
// 消息发布者.
publicinterfaceIMessagePublisher
{
TaskPublishAsync<TMessage>(string exchange, // 交换器名称.
string routingKey,// 队列/路由键名称.
TMessage message, // 事件对象.
Action<BasicProperties> properties,
CancellationToken cancellationToken =default)
whereTMessage:class;
TaskPublishAsync<TMessage>(string exchange,
string routingKey,
TMessage message,
BasicProperties? properties =default,
CancellationToken cancellationToken =default);
TaskPublishAsync<TMessage>(TMessage message,
Action<BasicProperties>? properties =,
CancellationToken cancellationToken =default)
whereTMessage:class;
TaskPublishAsync<TMessage>(TMessage model,
BasicProperties? properties =default,
CancellationToken cancellationToken =default);
TaskCustomPublishAsync<TMessage>(string exchange,
string routingKey,
TMessage message,
BasicProperties? properties =default,
CancellationToken cancellationToken =default);
}
Maomi.MQ 的消息发布接口就这么几个,由于直接公开了 BasicProperties ,因此开发者完全自由配置 RabbitMQ 原生的消息属性,所以接口比较简单,开发者使用接口时可以灵活一些,使用难度也不大。
BasicProperties 是 RabbitMQ 中的消息基础属性对象,直接面向开发者,可以消息的发布和消费变得灵活和丰富功能,例如,可以通过 BasicProperties 配置单条消息的过期时间:
await _messagePublisher.PublishAsync(exchange:string.Empty, routingKey:"publish", message:newTestEvent
{
Id= i
},(BasicProperties p)=>
{
p.Expiration="1000";
});
Maomi.MQ 通过 DefaultMessagePublisher 类型实现了 IMessagePublisher,DefaultMessagePublisher 默认生命周期是 Scoped:
services.AddScoped<IMessagePublisher, DefaultMessagePublisher>();
开发者也可以自行实现 IMessagePublisher 接口,实现自己的消息发布模型,具体示例请参考 DefaultMessagePublisher 类型。
原生通道
开发者可以通过 ConnectionPool
服务获取原生连接对象,直接在 IConnection 上使用 RabbitMQ 的接口发布消息:
private readonly ConnectionPool _connectionPool;
var connectionObject = _connectionPool.Get();
connectionObject.DefaultChannel.BasicPublishAsync(... ...);
常驻内存连接对象
Maomi.MQ 通过 ConnectionPool 管理
本文暂时没有评论,来添加一个吧(●'◡'●)