程序员的知识教程库

网站首页 > 教程分享 正文

Maomi.MQ 2.0 | 功能强大的 .NET 消息队列通讯模型框架

henian88 2025-03-13 15:40:12 教程分享 31 ℃ 0 评论

说明

作者:痴者工良

文档地址:https://mmq.whuanle.cn

仓库地址:https://github.com/whuanle/Maomi.MQ

作者博客:

  • o https://www.whuanle.cn
  • o https://www.cnblogs.com/whuanle

导读

Maomi.MQ 是一个简化了消息队列使用方式的通讯框架,目前支持了 RabbitMQ。

Maomi.MQ.RabbitMQ 是一个用于专为 RabbitMQ 设计的发布者和消费者通讯模型,大大简化了发布和消息的代码,并提供一系列简便和实用的功能,开发者可以通过框架提供的消费模型实现高性能消费、事件编排,框架还支持发布者确认机制、自定义重试机制、补偿机制、死信队列、延迟队列、连接通道复用等一系列的便利功能。开发者可以把更多的精力放到业务逻辑中,通过 Maomi.MQ.RabbitMQ 框架简化跨进程消息通讯模式,使得跨进程消息传递更加简单和可靠。

此外,框架通过 runtime 内置的 api 支持了分布式可观测性,可以通过进一步使用 OpenTelemetry 等框架进一步收集可观测性信息,推送到基础设施平台中。

快速开始

在本篇教程中,将介绍 Maomi.MQ.RabbitMQ 的使用方法,以便读者能够快速了解该框架的使用方式和特点。


创建一个 Web 项目(可参考 WebDemo 项目),引入 Maomi.MQ.RabbitMQ 包,在 Web 配置中注入服务:


// using Maomi.MQ;
// using RabbitMQ.Client;

builder.Services.AddMaomiMQ((MqOptionsBuilder options)=>
{
options.WorkId=1;
options.AppName="myapp";
options.Rabbit=(ConnectionFactory options)=>
{
options.HostName=Environment.GetEnvironmentVariable("RABBITMQ")!;
options.Port=5672;
options.ClientProvidedName=Assembly.GetExecutingAssembly().GetName().Name;
};
},[typeof(Program).Assembly]);

var app = builder.Build();

  • o WorkId: 指定用于生成分布式雪花 id 的节点 id,默认为 0。
    每条消息生成一个唯一的 id,便于追踪。如果不设置雪花id,在分布式服务中,多实例并行工作时,可能会产生相同的 id。
  • o AppName:用于标识消息的生产者,以及在日志和链路追踪中标识消息的生产者或消费者。
  • o Rabbit:RabbitMQ 客户端配置,请参考 ConnectionFactory

定义消息模型类,模型类是 MQ 通讯的消息基础,该模型类将会被序列化为二进制内容传递到 RabbitMQ 服务器中。


public classTestEvent
{
publicintId{get;set;}

public override string ToString()
{
returnId.ToString();
}
}

定义消费者,消费者需要实现 IConsumer 接口,以及使用 [Consumer] 特性注解配置消费者属性,如下所示,[Consumer("test")] 表示该消费者订阅的队列名称是 test

IConsumer 接口有三个方法,ExecuteAsync 方法用于处理消息,FaildAsync 会在 ExecuteAsync 异常时立即执行,如果代码一直异常,最终会调用 FallbackAsync 方法,Maomi.MQ 框架会根据 ConsumerState 值确定是否将消息放回队列重新消费,或者做其它处理动作。


[Consumer("test")]
publicclassMyConsumer:IConsumer<TestEvent>
{
// 消费
publicasyncTaskExecuteAsync(MessageHeader messageHeader,TestEvent message)
{
Console.WriteLine($"事件 id: {message.Id} {DateTime.Now}");
awaitTask.CompletedTask;
}

// 每次消费失败时执行
publicTaskFaildAsync(MessageHeader messageHeader,Exception ex,int retryCount,TestEvent message)
=>Task.CompletedTask;

// 补偿
publicTask<ConsumerState>FallbackAsync(MessageHeader messageHeader,TestEvent? message,Exception? ex)
=>Task.FromResult(ConsumerState.Ack);
}

Maomi.MQ 还具有多种消费者模式,代码写法不一样,后续会详细讲解不同的消费者模式。


如果要发布消息,只需要注入 IMessagePublisher 服务即可。


[ApiController]
[Route("[controller]")]
publicclassIndexController:ControllerBase
{
privatereadonlyIMessagePublisher _messagePublisher;

publicIndexController(IMessagePublisher messagePublisher)
{
_messagePublisher = messagePublisher;
}

[HttpGet("publish")]
publicasyncTask<string>Publisher()
{
// 发布消息
await _messagePublisher.PublishAsync(exchange:string.Empty, routingKey:"test", message:newTestEvent
{
Id=123
});
return"ok";
}
}

启动 Web 服务,在 swagger 页面上请求 API 接口,MyConsumer 服务会立即接收到发布的消息。


如果是控制台项目,则需要引入 Microsoft.Extensions.Hosting 包,以便让消费者在后台订阅队列消费消息。

参考 ConsoleDemo 项目。


using Maomi.MQ;
usingMicrosoft.Extensions.Hosting;
usingMicrosoft.Extensions.Logging;
usingRabbitMQ.Client;
usingSystem.Reflection;

var host =newHostBuilder()
.ConfigureLogging(options =>
{
options.AddConsole();
options.AddDebug();
})
.ConfigureServices(services =>
{
services.AddMaomiMQ(options =>
{
options.WorkId=1;
options.AppName="myapp";
options.Rabbit=(ConnectionFactory options)=>
{
options.HostName=Environment.GetEnvironmentVariable("RABBITMQ")!;
options.Port=5672;
options.ClientProvidedName=Assembly.GetExecutingAssembly().GetName().Name;
};
},newSystem.Reflection.Assembly[]{typeof(Program).Assembly});

}).Build();

// 后台运行
var task = host.RunAsync();

Console.ReadLine();

消息发布者

消息发布者用于推送消息到 RabbitMQ 服务器中,Maomi.MQ 支持多种消息发布者模式,支持 RabbitMQ 事务模式等,示例项目请参考 PublisherWeb

Maomi.MQ 通过 IMessagePublisher 向开发者提供消息推送服务。


在发布消息之前,需要定义一个事件模型类,用于传递消息。


public classTestEvent
{
publicintId{get;set;}

public override string ToString()
{
returnId.ToString();
}
}

然后注入 IMessagePublisher 服务,发布消息:


[ApiController]
[Route("[controller]")]
publicclassIndexController:ControllerBase
{
privatereadonlyIMessagePublisher _messagePublisher;

publicIndexController(IMessagePublisher messagePublisher)
{
_messagePublisher = messagePublisher;
}

[HttpGet("publish")]
publicasyncTask<string>Publisher()
{
for(var i =0; i <100; i++)
{
await _messagePublisher.PublishAsync(exchange:string.Empty, routingKey:"publish", message:newTestEvent
{
Id= i
});
}

return"ok";
}
}

一般情况下,一个模型类只应该被一个消费者所使用,那么通过事件可以找到唯一的消费者,也就是通过事件类型找到消费者的 IConsumerOptions,此时框架可以使用对应的配置发送消息。

TestMessageEvent 模型只有一个消费者:


[Consumer("publish", Qos = 1, RetryFaildRequeue = true)]
public class TestEventConsumer : IConsumer<TestMessageEvent>
{
// ... ...
}

可以直接发送事件,不需要填写交换器(Exchange)和路由键(RoutingKey)。


[HttpGet("publish_message")]
publicasyncTask<string>PublisherMessage()
{
// 如果在本项目中 TestMessageEvent 只指定了一个消费者,那么通过 TestMessageEvent 自动寻找对应的配置
for(var i =0; i <100; i++)
{
await _messagePublisher.PublishAsync(model:newTestMessageEvent
{
Id= i
});
}

return"ok";
}

IMessagePublisher

IMessagePublisher 是 Maomi.MQ 的基础消息发布接口,有以下方法:


// 消息发布者.
publicinterfaceIMessagePublisher
{
TaskPublishAsync<TMessage>(string exchange, // 交换器名称.
string routingKey,// 队列/路由键名称.
TMessage message, // 事件对象.
Action<BasicProperties> properties,
CancellationToken cancellationToken =default)
whereTMessage:class;

TaskPublishAsync<TMessage>(string exchange,
string routingKey,
TMessage message,
BasicProperties? properties =default,
CancellationToken cancellationToken =default);

TaskPublishAsync<TMessage>(TMessage message,
Action<BasicProperties>? properties =,
CancellationToken cancellationToken =default)
whereTMessage:class;

TaskPublishAsync<TMessage>(TMessage model,
BasicProperties? properties =default,
CancellationToken cancellationToken =default);

TaskCustomPublishAsync<TMessage>(string exchange,
string routingKey,
TMessage message,
BasicProperties? properties =default,
CancellationToken cancellationToken =default);
}

Maomi.MQ 的消息发布接口就这么几个,由于直接公开了 BasicProperties ,因此开发者完全自由配置 RabbitMQ 原生的消息属性,所以接口比较简单,开发者使用接口时可以灵活一些,使用难度也不大。


BasicProperties 是 RabbitMQ 中的消息基础属性对象,直接面向开发者,可以消息的发布和消费变得灵活和丰富功能,例如,可以通过 BasicProperties 配置单条消息的过期时间:


await _messagePublisher.PublishAsync(exchange:string.Empty, routingKey:"publish", message:newTestEvent
{
Id= i
},(BasicProperties p)=>
{
p.Expiration="1000";
});

Maomi.MQ 通过 DefaultMessagePublisher 类型实现了 IMessagePublisher,DefaultMessagePublisher 默认生命周期是 Scoped:


services.AddScoped<IMessagePublisher, DefaultMessagePublisher>();

开发者也可以自行实现 IMessagePublisher 接口,实现自己的消息发布模型,具体示例请参考 DefaultMessagePublisher 类型。

原生通道

开发者可以通过 ConnectionPool 服务获取原生连接对象,直接在 IConnection 上使用 RabbitMQ 的接口发布消息:


private readonly ConnectionPool _connectionPool;

var connectionObject = _connectionPool.Get();
connectionObject.DefaultChannel.BasicPublishAsync(... ...);

常驻内存连接对象

Maomi.MQ 通过 ConnectionPool 管理

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表