using Microsoft.AspNetCore.Connections; using Newtonsoft.Json; using RabbitMQ.Client; using System.Collections.Concurrent; using System.Text; using System.Threading.Channels; using static Microsoft.EntityFrameworkCore.DbLoggerCategory.Database; namespace AmrControl.mq { /// /// rabbitmq服务 /// public class RabbitmqService : IRabbitmqService, IDisposable { private const string EXCHANGE_NAME = "wcs-exchange"; private const string COMMON_TOPIC = "wcs-handle"; /// /// 确认处理 /// private static ConcurrentDictionary> outstandingConfirms = new ConcurrentDictionary>(); /// /// 配置文件 /// private readonly IConfiguration _configuration; /// /// 启动状态 /// private volatile bool started; /// /// 连接工厂 /// private ConnectionFactory factory; /// /// mq连接 /// private IConnection _connection; /// /// 通道处理 /// private IModel _channel; private static volatile RabbitmqService instance = null; public RabbitmqService(IConfiguration configuration) { this._configuration = configuration; instance = this; } /// /// 获取实例 /// /// public static RabbitmqService getInstance() { return instance; } public void Dispose() { started = false; _channel?.Dispose(); _connection?.Dispose(); } public void Init() { if(!started) { started = true; factory = new ConnectionFactory() { HostName = _configuration["mq:host"], Port = int.Parse(_configuration["mq:port"]), UserName = _configuration["mq:username"], Password = _configuration["mq:password"], //ClientProperties = new Dictionary //{ // ["publisher-confirm-type"] = "correlated", // ["publisher-returns"] = true //} }; connect(); checkAndReConn(); } } /// /// 启动连接检查 /// /// async Task checkAndReConn() { while(started) { if(!isConnect()) { connect(); } await Task.Delay(TimeSpan.FromSeconds(10)); } } /// /// 判断连接状态是否正常 /// /// private bool isConnect() { return _connection != null && _connection.IsOpen && _channel != null && _channel.IsOpen; } /// /// 连接rabbitmq /// private void connect() { try { _connection = factory.CreateConnection(); _channel = _connection.CreateModel(); //声明交换机,以发布订阅方式发送消息 _channel.ExchangeDeclare(EXCHANGE_NAME, ExchangeType.Topic, true); } catch { Console.WriteLine("connect failed!"); } } public void SendMsg(string topic, MqMessage msg) { //允许消息持久化到磁盘 var properties = _channel.CreateBasicProperties(); if(msg.msgType != 3) { //工位消息不做持久化处理 properties.Persistent = true; } //启用发布确认模式 _channel.ConfirmSelect(); _channel.BasicPublish(EXCHANGE_NAME, topic, properties, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(msg))); // 添加消息发送失败的回调函数 _channel.BasicReturn += (sender, args) => { Console.WriteLine("消息找不到投送的queue,投送失败,消息内容: {0}", Encoding.UTF8.GetString(args.Body.ToArray())); }; // 添加消息确认回调函数 _channel.BasicAcks += (sender, args) => { Console.WriteLine("消息发送成功"); }; //拒绝处理监听 _channel.BasicNacks += (sender, ea) => { Console.WriteLine("客户端拒绝接收消息"); }; } /// /// 通用topic /// /// public string getCommonTopic() { return COMMON_TOPIC; } public void SendMsg(MqMessage msg) { SendMsg(COMMON_TOPIC, msg); } } }