123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174 |
- using Microsoft.AspNetCore.Connections;
- using Newtonsoft.Json;
- using RabbitMQ.Client;
- using System.Collections.Concurrent;
- using System.Text;
- using System.Threading.Channels;
- using static Microsoft.EntityFrameworkCore.DbLoggerCategory.Database;
- namespace AmrControl.mq
- {
- /// <summary>
- /// rabbitmq服务
- /// </summary>
- public class RabbitmqService : IRabbitmqService, IDisposable
- {
- private const string EXCHANGE_NAME = "wcs-exchange";
- private const string COMMON_TOPIC = "wcs-handle";
- /// <summary>
- /// 确认处理
- /// </summary>
- private static ConcurrentDictionary<ulong, TaskCompletionSource<byte>> outstandingConfirms = new ConcurrentDictionary<ulong, TaskCompletionSource<byte>>();
- /// <summary>
- /// 配置文件
- /// </summary>
- private readonly IConfiguration _configuration;
- /// <summary>
- /// 启动状态
- /// </summary>
- private volatile bool started;
- /// <summary>
- /// 连接工厂
- /// </summary>
- private ConnectionFactory factory;
- /// <summary>
- /// mq连接
- /// </summary>
- private IConnection _connection;
- /// <summary>
- /// 通道处理
- /// </summary>
- private IModel _channel;
- private static volatile RabbitmqService instance = null;
- public RabbitmqService(IConfiguration configuration)
- {
- this._configuration = configuration;
- instance = this;
- }
- /// <summary>
- /// 获取实例
- /// </summary>
- /// <returns></returns>
- public static RabbitmqService getInstance()
- {
- return instance;
- }
- public void Dispose()
- {
- started = false;
- _channel?.Dispose();
- _connection?.Dispose();
- }
- public void Init()
- {
- if(!started)
- {
- started = true;
- factory = new ConnectionFactory()
- {
- HostName = _configuration["mq:host"],
- Port = int.Parse(_configuration["mq:port"]),
- UserName = _configuration["mq:username"],
- Password = _configuration["mq:password"],
- //ClientProperties = new Dictionary<string, object>
- //{
- // ["publisher-confirm-type"] = "correlated",
- // ["publisher-returns"] = true
- //}
- };
- connect();
- checkAndReConn();
- }
-
- }
- /// <summary>
- /// 启动连接检查
- /// </summary>
- /// <returns></returns>
- async Task checkAndReConn()
- {
- while(started)
- {
- if(!isConnect())
- {
- connect();
- }
- await Task.Delay(TimeSpan.FromSeconds(10));
- }
- }
- /// <summary>
- /// 判断连接状态是否正常
- /// </summary>
- /// <returns></returns>
- private bool isConnect()
- {
- return _connection != null && _connection.IsOpen && _channel != null && _channel.IsOpen;
- }
- /// <summary>
- /// 连接rabbitmq
- /// </summary>
- private void connect()
- {
- try
- {
- _connection = factory.CreateConnection();
- _channel = _connection.CreateModel();
- //声明交换机,以发布订阅方式发送消息
- _channel.ExchangeDeclare(EXCHANGE_NAME, ExchangeType.Topic, true);
- } catch
- {
- Console.WriteLine("connect failed!");
- }
-
- }
- public void SendMsg<T>(string topic, MqMessage<T> msg)
- {
- //允许消息持久化到磁盘
- var properties = _channel.CreateBasicProperties();
- if(msg.msgType != 3)
- {
- //工位消息不做持久化处理
- properties.Persistent = true;
- }
- //启用发布确认模式
- _channel.ConfirmSelect();
- _channel.BasicPublish(EXCHANGE_NAME, topic, properties, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(msg)));
- // 添加消息发送失败的回调函数
- _channel.BasicReturn += (sender, args) =>
- {
- Console.WriteLine("消息找不到投送的queue,投送失败,消息内容: {0}", Encoding.UTF8.GetString(args.Body.ToArray()));
- };
- // 添加消息确认回调函数
- _channel.BasicAcks += (sender, args) =>
- {
- Console.WriteLine("消息发送成功");
- };
- //拒绝处理监听
- _channel.BasicNacks += (sender, ea) =>
- {
- Console.WriteLine("客户端拒绝接收消息");
- };
- }
- /// <summary>
- /// 通用topic
- /// </summary>
- /// <returns></returns>
- public string getCommonTopic()
- {
- return COMMON_TOPIC;
- }
- public void SendMsg<T>(MqMessage<T> msg)
- {
- SendMsg(COMMON_TOPIC, msg);
- }
- }
- }
|