using Microsoft.AspNetCore.Connections;
using Newtonsoft.Json;
using RabbitMQ.Client;
using System.Collections.Concurrent;
using System.Text;
using System.Threading.Channels;
using static Microsoft.EntityFrameworkCore.DbLoggerCategory.Database;
namespace AmrControl.mq
{
///
/// rabbitmq服务
///
public class RabbitmqService : IRabbitmqService, IDisposable
{
private const string EXCHANGE_NAME = "wcs-exchange";
private const string COMMON_TOPIC = "wcs-handle";
///
/// 确认处理
///
private static ConcurrentDictionary> outstandingConfirms = new ConcurrentDictionary>();
///
/// 配置文件
///
private readonly IConfiguration _configuration;
///
/// 启动状态
///
private volatile bool started;
///
/// 连接工厂
///
private ConnectionFactory factory;
///
/// mq连接
///
private IConnection _connection;
///
/// 通道处理
///
private IModel _channel;
private static volatile RabbitmqService instance = null;
public RabbitmqService(IConfiguration configuration)
{
this._configuration = configuration;
instance = this;
}
///
/// 获取实例
///
///
public static RabbitmqService getInstance()
{
return instance;
}
public void Dispose()
{
started = false;
_channel?.Dispose();
_connection?.Dispose();
}
public void Init()
{
if(!started)
{
started = true;
factory = new ConnectionFactory()
{
HostName = _configuration["mq:host"],
Port = int.Parse(_configuration["mq:port"]),
UserName = _configuration["mq:username"],
Password = _configuration["mq:password"],
//ClientProperties = new Dictionary
//{
// ["publisher-confirm-type"] = "correlated",
// ["publisher-returns"] = true
//}
};
connect();
checkAndReConn();
}
}
///
/// 启动连接检查
///
///
async Task checkAndReConn()
{
while(started)
{
if(!isConnect())
{
connect();
}
await Task.Delay(TimeSpan.FromSeconds(10));
}
}
///
/// 判断连接状态是否正常
///
///
private bool isConnect()
{
return _connection != null && _connection.IsOpen && _channel != null && _channel.IsOpen;
}
///
/// 连接rabbitmq
///
private void connect()
{
try
{
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
//声明交换机,以发布订阅方式发送消息
_channel.ExchangeDeclare(EXCHANGE_NAME, ExchangeType.Topic, true);
} catch
{
Console.WriteLine("connect failed!");
}
}
public void SendMsg(string topic, MqMessage msg)
{
//允许消息持久化到磁盘
var properties = _channel.CreateBasicProperties();
if(msg.msgType != 3)
{
//工位消息不做持久化处理
properties.Persistent = true;
}
//启用发布确认模式
_channel.ConfirmSelect();
_channel.BasicPublish(EXCHANGE_NAME, topic, properties, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(msg)));
// 添加消息发送失败的回调函数
_channel.BasicReturn += (sender, args) =>
{
Console.WriteLine("消息找不到投送的queue,投送失败,消息内容: {0}", Encoding.UTF8.GetString(args.Body.ToArray()));
};
// 添加消息确认回调函数
_channel.BasicAcks += (sender, args) =>
{
Console.WriteLine("消息发送成功");
};
//拒绝处理监听
_channel.BasicNacks += (sender, ea) =>
{
Console.WriteLine("客户端拒绝接收消息");
};
}
///
/// 通用topic
///
///
public string getCommonTopic()
{
return COMMON_TOPIC;
}
public void SendMsg(MqMessage msg)
{
SendMsg(COMMON_TOPIC, msg);
}
}
}