RabbitmqService.cs 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. using Microsoft.AspNetCore.Connections;
  2. using Newtonsoft.Json;
  3. using RabbitMQ.Client;
  4. using System.Collections.Concurrent;
  5. using System.Text;
  6. using System.Threading.Channels;
  7. using static Microsoft.EntityFrameworkCore.DbLoggerCategory.Database;
  8. namespace AmrControl.mq
  9. {
  10. /// <summary>
  11. /// rabbitmq服务
  12. /// </summary>
  13. public class RabbitmqService : IRabbitmqService, IDisposable
  14. {
  15. private const string EXCHANGE_NAME = "wcs-exchange";
  16. private const string COMMON_TOPIC = "wcs-handle";
  17. /// <summary>
  18. /// 确认处理
  19. /// </summary>
  20. private static ConcurrentDictionary<ulong, TaskCompletionSource<byte>> outstandingConfirms = new ConcurrentDictionary<ulong, TaskCompletionSource<byte>>();
  21. /// <summary>
  22. /// 配置文件
  23. /// </summary>
  24. private readonly IConfiguration _configuration;
  25. /// <summary>
  26. /// 启动状态
  27. /// </summary>
  28. private volatile bool started;
  29. /// <summary>
  30. /// 连接工厂
  31. /// </summary>
  32. private ConnectionFactory factory;
  33. /// <summary>
  34. /// mq连接
  35. /// </summary>
  36. private IConnection _connection;
  37. /// <summary>
  38. /// 通道处理
  39. /// </summary>
  40. private IModel _channel;
  41. private static volatile RabbitmqService instance = null;
  42. public RabbitmqService(IConfiguration configuration)
  43. {
  44. this._configuration = configuration;
  45. instance = this;
  46. }
  47. /// <summary>
  48. /// 获取实例
  49. /// </summary>
  50. /// <returns></returns>
  51. public static RabbitmqService getInstance()
  52. {
  53. return instance;
  54. }
  55. public void Dispose()
  56. {
  57. started = false;
  58. _channel?.Dispose();
  59. _connection?.Dispose();
  60. }
  61. public void Init()
  62. {
  63. if(!started)
  64. {
  65. started = true;
  66. factory = new ConnectionFactory()
  67. {
  68. HostName = _configuration["mq:host"],
  69. Port = int.Parse(_configuration["mq:port"]),
  70. UserName = _configuration["mq:username"],
  71. Password = _configuration["mq:password"],
  72. //ClientProperties = new Dictionary<string, object>
  73. //{
  74. // ["publisher-confirm-type"] = "correlated",
  75. // ["publisher-returns"] = true
  76. //}
  77. };
  78. connect();
  79. checkAndReConn();
  80. }
  81. }
  82. /// <summary>
  83. /// 启动连接检查
  84. /// </summary>
  85. /// <returns></returns>
  86. async Task checkAndReConn()
  87. {
  88. while(started)
  89. {
  90. if(!isConnect())
  91. {
  92. connect();
  93. }
  94. await Task.Delay(TimeSpan.FromSeconds(10));
  95. }
  96. }
  97. /// <summary>
  98. /// 判断连接状态是否正常
  99. /// </summary>
  100. /// <returns></returns>
  101. private bool isConnect()
  102. {
  103. return _connection != null && _connection.IsOpen && _channel != null && _channel.IsOpen;
  104. }
  105. /// <summary>
  106. /// 连接rabbitmq
  107. /// </summary>
  108. private void connect()
  109. {
  110. try
  111. {
  112. _connection = factory.CreateConnection();
  113. _channel = _connection.CreateModel();
  114. //声明交换机,以发布订阅方式发送消息
  115. _channel.ExchangeDeclare(EXCHANGE_NAME, ExchangeType.Topic, true);
  116. } catch
  117. {
  118. Console.WriteLine("connect failed!");
  119. }
  120. }
  121. public void SendMsg<T>(string topic, MqMessage<T> msg)
  122. {
  123. //允许消息持久化到磁盘
  124. var properties = _channel.CreateBasicProperties();
  125. if(msg.msgType != 3)
  126. {
  127. //工位消息不做持久化处理
  128. properties.Persistent = true;
  129. }
  130. //启用发布确认模式
  131. _channel.ConfirmSelect();
  132. _channel.BasicPublish(EXCHANGE_NAME, topic, properties, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(msg)));
  133. // 添加消息发送失败的回调函数
  134. _channel.BasicReturn += (sender, args) =>
  135. {
  136. Console.WriteLine("消息找不到投送的queue,投送失败,消息内容: {0}", Encoding.UTF8.GetString(args.Body.ToArray()));
  137. };
  138. // 添加消息确认回调函数
  139. _channel.BasicAcks += (sender, args) =>
  140. {
  141. Console.WriteLine("消息发送成功");
  142. };
  143. //拒绝处理监听
  144. _channel.BasicNacks += (sender, ea) =>
  145. {
  146. Console.WriteLine("客户端拒绝接收消息");
  147. };
  148. }
  149. /// <summary>
  150. /// 通用topic
  151. /// </summary>
  152. /// <returns></returns>
  153. public string getCommonTopic()
  154. {
  155. return COMMON_TOPIC;
  156. }
  157. public void SendMsg<T>(MqMessage<T> msg)
  158. {
  159. SendMsg(COMMON_TOPIC, msg);
  160. }
  161. }
  162. }