MqttClient.cs 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279
  1. using AmrControl.workstation;
  2. using MQTTnet;
  3. using MQTTnet.Client;
  4. using MQTTnet.Protocol;
  5. using Newtonsoft.Json;
  6. using SixLabors.ImageSharp.ColorSpaces.Companding;
  7. namespace AmrControl.Clients
  8. {
  9. /// <summary>
  10. /// 来回读配置文件会导致内存异常,这是目前dotnet的问题,因此不要直接引用配置文件的内容
  11. /// </summary>
  12. public class MqttClient
  13. {
  14. /// <summary>
  15. /// client客户端
  16. /// </summary>
  17. private IMqttClient mqttClient;
  18. /// <summary>
  19. /// 连接状态
  20. /// </summary>
  21. private bool linkedState = false;
  22. /// <summary>
  23. /// 配置信息
  24. /// </summary>
  25. private readonly IConfiguration _configuration;
  26. /// <summary>
  27. /// 服务获取
  28. /// </summary>
  29. private readonly IServiceProvider _service;
  30. // protected readonly Microsoft.Extensions.Logging.ILogger _logger;
  31. private bool reConnTaskStarted = false;
  32. private object obj = new object();
  33. string wsid, send_topic, sub_topic;
  34. /// <summary>
  35. /// 构造函数
  36. /// </summary>
  37. /// <param name="configuration"></param>
  38. /// <param name="service"></param>
  39. public MqttClient(IConfiguration configuration, IServiceProvider service)
  40. {
  41. this._configuration = configuration;
  42. this._service = service;
  43. wsid = _configuration["Mqtt:id"]; //用户标识ID
  44. send_topic = _configuration["Mqtt:send-topic"]; //用户标识ID
  45. sub_topic = _configuration["Mqtt:sub-topic"]; //用户标识ID
  46. }
  47. /// <summary>
  48. /// 构建客户端
  49. /// </summary>
  50. public void CreateClient()
  51. {
  52. try
  53. {
  54. if (linkedState && mqttClient != null && mqttClient.IsConnected)
  55. {
  56. //启动并且连接成功的服务不再考虑进行连接
  57. return;
  58. }
  59. if(mqttClient != null)
  60. {
  61. mqttClient.Dispose();
  62. }
  63. MqttClientOptions Option = new MqttClientOptionsBuilder().WithTcpServer(_configuration["Mqtt:url"], int.Parse(_configuration["Mqtt:port"]))
  64. .WithClientId(wsid) //客户端标识Id要唯一。
  65. //.WithCredentials(_configuration["Mqtt:username"], _configuration["Mqtt:password"]) //用户名,密码
  66. .WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V311)
  67. .WithCleanSession(true)
  68. .WithWillQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtMostOnce)
  69. .WithTimeout(new TimeSpan(0, 0, 10))
  70. .WithKeepAlivePeriod(TimeSpan.FromSeconds(20))
  71. .Build();
  72. mqttClient = new MqttFactory().CreateMqttClient();
  73. mqttClient.ApplicationMessageReceivedAsync += ReceivedHandle;
  74. mqttClient.ConnectedAsync += ConnectedHandle;
  75. mqttClient.DisconnectedAsync += DisconnectedHandle;
  76. //调用异步方法连接到服务端
  77. mqttClient.ConnectAsync(Option).Wait();
  78. //订阅主题
  79. SubTopics(sub_topic);
  80. }
  81. catch (Exception ex)
  82. {
  83. // _logger.LogError($"mqttclient connect failed. the cId {CId}, the error: {ex.Message}");
  84. linkedState = false;
  85. }
  86. }
  87. /// <summary>
  88. /// 订阅主题
  89. /// </summary>
  90. public async void SubTopics(string topic)
  91. {
  92. topic = topic.Trim();
  93. if (string.IsNullOrEmpty(topic))
  94. {
  95. // _logger.LogWarning("订阅主题不能为空!");
  96. return;
  97. }
  98. topic = string.Format(topic, wsid);
  99. // 判断客户端是否连接
  100. if (mqttClient == null || !mqttClient.IsConnected)
  101. {
  102. //_logger.LogWarning("MQTT 客户端尚未连接!");
  103. return;
  104. }
  105. // 设置订阅参数
  106. var subscribeOptions = new MqttClientSubscribeOptionsBuilder()
  107. .WithTopicFilter(topic, MqttQualityOfServiceLevel.ExactlyOnce)
  108. .Build();
  109. try
  110. {
  111. // 订阅
  112. await mqttClient.SubscribeAsync(
  113. subscribeOptions,
  114. System.Threading.CancellationToken.None);
  115. // _logger.LogInformation("监听主题:" + topic);
  116. Console.WriteLine("监听主题:" + topic);
  117. }
  118. catch (Exception e)
  119. {
  120. // _logger.LogError($"消息监听失败.监听主题:{topic}, 失败原因:{e.Message}");
  121. Console.WriteLine($"消息监听失败.监听主题:{topic}, 失败原因:{e.Message}");
  122. }
  123. }
  124. /// <summary>
  125. /// 处理接收到的消息
  126. /// </summary>
  127. /// <param name="arg"></param>
  128. /// <returns></returns>
  129. private Task ReceivedHandle(MqttApplicationMessageReceivedEventArgs arg)
  130. {
  131. ///收到消息
  132. string content = System.Text.Encoding.UTF8.GetString(arg.ApplicationMessage.Payload);
  133. //_logger.LogDebug("接收消息:" + content);
  134. MsWorkstation ws = _service.GetRequiredService<MsWorkstation>();
  135. if(ws != null)
  136. {
  137. ws.PushMqMessage(arg.ApplicationMessage.Topic, content);
  138. }
  139. return Task.CompletedTask;
  140. }
  141. /// <summary>
  142. /// 连接丢失处理
  143. /// </summary>
  144. /// <param name="arg"></param>
  145. /// <returns></returns>
  146. private Task DisconnectedHandle(MqttClientDisconnectedEventArgs arg)
  147. {
  148. //_logger.LogWarning("MQTT发生断线重联!");
  149. Thread.Sleep(2000);
  150. reConnect();
  151. return Task.CompletedTask;
  152. }
  153. public async void reConnect()
  154. {
  155. lock (obj)
  156. {
  157. if (reConnTaskStarted)
  158. return ;
  159. //防止多个线程执行
  160. reConnTaskStarted = true;
  161. if (mqttClient != null)
  162. {
  163. mqttClient.Dispose();
  164. }
  165. mqttClient = null;
  166. }
  167. try
  168. {
  169. linkedState = false;
  170. //重连服务
  171. CreateClient();
  172. reConnTaskStarted = false;
  173. }
  174. catch
  175. {
  176. }
  177. return ;
  178. }
  179. /// <summary>
  180. /// 连接处理
  181. /// </summary>
  182. /// <param name="arg"></param>
  183. /// <returns></returns>
  184. private Task ConnectedHandle(MqttClientConnectedEventArgs arg)
  185. {
  186. linkedState = true;
  187. return Task.CompletedTask;
  188. }
  189. public bool SendToMqtt(string msg)
  190. {
  191. return SendToMqtt(send_topic, msg);
  192. }
  193. public bool SendToMqtt(string topic, string msg)
  194. {
  195. try
  196. {
  197. if (mqttClient == null)
  198. return false;
  199. if (linkedState || mqttClient.IsConnected)
  200. {
  201. //发布消息
  202. ClientPublish(topic, msg);
  203. return true;
  204. }
  205. else
  206. {
  207. return false;
  208. }
  209. }
  210. catch
  211. {
  212. return false;
  213. }
  214. }
  215. /// <summary>
  216. /// 发送消息
  217. /// </summary>
  218. /// <param name="topic"></param>
  219. /// <param name="message"></param>
  220. private async void ClientPublish(string topic, string message)
  221. {
  222. topic = topic.Trim();
  223. message = message.Trim();
  224. if (string.IsNullOrEmpty(topic))
  225. {
  226. // _logger.LogWarning("发送主题不能为空!");
  227. return;
  228. }
  229. // 判断客户端是否连接
  230. if (!linkedState || mqttClient == null || !mqttClient.IsConnected)
  231. {
  232. //_logger.LogWarning("MQTT 客户端尚未连接!");
  233. return;
  234. }
  235. // _logger.LogDebug("发送消息:" + message);
  236. // 填充消息
  237. var applicationMessage = new MqttApplicationMessageBuilder()
  238. .WithTopic(topic) // 主题
  239. .WithPayload(message) // 消息
  240. .WithRetainFlag(false) // 持久化为空,不持久化
  241. .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce)
  242. .Build();
  243. try
  244. {
  245. await mqttClient.PublishAsync(applicationMessage);
  246. }
  247. catch (Exception)
  248. {
  249. }
  250. }
  251. }
  252. }